kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From granthe...@apache.org
Subject [kudu] 04/04: [backup] Support column alterations between backups
Date Thu, 02 May 2019 12:45:55 GMT
This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 70e575fb706290a059c0b8ad5b0c3b4be436f3b2
Author: Grant Henke <granthenke@apache.org>
AuthorDate: Mon Apr 29 09:14:39 2019 -0500

    [backup] Support column alterations between backups
    
    This patch adds support for column alterations between
    backups.
    
    When restoring each backup, it leverages the column ID
    map in the metadata to compare against the final
    metadata. Using this it can detect and handle
    added, dropped, and renamed columns.
    
    Change-Id: I15e159caf485d83f4d5a36305d677828840bff03
    Reviewed-on: http://gerrit.cloudera.org:8080/13173
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <adar@cloudera.com>
    Reviewed-by: Mike Percy <mpercy@apache.org>
---
 .../scala/org/apache/kudu/backup/KuduRestore.scala | 44 +++++++++++-
 .../org/apache/kudu/backup/TestKuduBackup.scala    | 79 ++++++++++++++++++++++
 2 files changed, 121 insertions(+), 2 deletions(-)

diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
index eb2a03e..bf6103c 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
@@ -21,12 +21,15 @@ import org.apache.kudu.client.AlterTableOptions
 import org.apache.kudu.client.SessionConfiguration.FlushMode
 import org.apache.kudu.spark.kudu.KuduContext
 import org.apache.kudu.spark.kudu.RowConverter
+import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.SparkSession
 import org.apache.yetus.audience.InterfaceAudience
 import org.apache.yetus.audience.InterfaceStability
 import org.slf4j.Logger
 import org.slf4j.LoggerFactory
 
+import scala.collection.JavaConverters._
+
 /**
  * The main class for a Kudu restore spark job.
  */
@@ -68,9 +71,7 @@ object KuduRestore {
 
         val backupSchema = io.dataSchema(TableMetadata.getKuduSchema(backup.metadata))
         val rowActionCol = backupSchema.fields.last.name
-
         val table = context.syncClient.openTable(restoreName)
-        val restoreSchema = io.dataSchema(table.getSchema)
 
         var data = session.sqlContext.read
           .format(backup.metadata.getDataFormat)
@@ -81,6 +82,10 @@ object KuduRestore {
           .na
           .fill(RowAction.UPSERT.getValue, Seq(rowActionCol))
 
+        // Adjust for dropped and renamed columns.
+        data = adjustSchema(data, backup.metadata, lastMetadata, rowActionCol)
+        val restoreSchema = data.schema
+
         // Write the data to Kudu.
         data.queryExecution.toRdd.foreachPartition { internalRows =>
           val table = context.syncClient.openTable(restoreName)
@@ -151,6 +156,41 @@ object KuduRestore {
     })
   }
 
+  /**
+   * Returns a modified DataFrame with columns adjusted to match the lastMetadata.
+   */
+  private def adjustSchema(
+      df: DataFrame,
+      currentMetadata: TableMetadataPB,
+      lastMetadata: TableMetadataPB,
+      rowActionCol: String): DataFrame = {
+    log.info("Adjusting columns to handle alterations")
+    val idToName = lastMetadata.getColumnIdsMap.asScala.map(_.swap)
+    // Ignore the rowActionCol, which isn't a real column.
+    val currentColumns = currentMetadata.getColumnIdsMap.asScala.filter(_._1 != rowActionCol)
+    var result = df
+    // First drop all the columns that no longer exist.
+    // This is required to be sure a rename doesn't collide with an old column.
+    currentColumns.foreach {
+      case (colName, id) =>
+        if (!idToName.contains(id)) {
+          // If the last metadata doesn't contain the id, the column is dropped.
+          log.info(s"Dropping the column $colName from backup data")
+          result = result.drop(colName)
+        }
+    }
+    // Then rename all the columns that were renamed in the last metadata.
+    currentColumns.foreach {
+      case (colName, id) =>
+        if (idToName.contains(id) && idToName(id) != colName) {
+          // If the final name doesn't match the current name, the column is renamed.
+          log.info(s"Renamed the column $colName to ${idToName(id)} in backup data")
+          result = result.withColumnRenamed(colName, idToName(id))
+        }
+    }
+    result
+  }
+
   def main(args: Array[String]): Unit = {
     val options = RestoreOptions
       .parse(args)
diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
index c2b5356..dcc813a 100644
--- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
+++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
@@ -26,6 +26,8 @@ import org.apache.kudu.client.PartitionSchema.HashBucketSchema
 import org.apache.kudu.client._
 import org.apache.kudu.ColumnSchema
 import org.apache.kudu.Schema
+import org.apache.kudu.Type
+import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder
 import org.apache.kudu.spark.kudu._
 import org.apache.kudu.test.CapturingLogAppender
 import org.apache.kudu.test.RandomUtils
@@ -270,6 +272,83 @@ class TestKuduBackup extends KuduTestSuite {
     restoreAndValidateTable(tableName, rowCount)
   }
 
+  @Test
+  def testColumnAlterHandling(): Unit = {
+    // Create a basic table.
+    val tableName = "testColumnAlterHandling"
+    val columns = List(
+      new ColumnSchemaBuilder("key", Type.INT32).key(true).build(),
+      new ColumnSchemaBuilder("col_a", Type.STRING).build(),
+      new ColumnSchemaBuilder("col_b", Type.STRING).build(),
+      new ColumnSchemaBuilder("col_c", Type.STRING).build(),
+      new ColumnSchemaBuilder("col_d", Type.STRING).build()
+    )
+    val schema = new Schema(columns.asJava)
+    val options = new CreateTableOptions()
+      .setRangePartitionColumns(List("key").asJava)
+    var table = kuduClient.createTable(tableName, schema, options)
+    val session = kuduClient.newSession()
+
+    // Insert some rows and take a full backup.
+    Range(0, 10).foreach { i =>
+      val insert = table.newInsert
+      val row = insert.getRow
+      row.addInt("key", i)
+      row.addString("col_a", s"a$i")
+      row.addString("col_b", s"b$i")
+      row.addString("col_c", s"c$i")
+      row.addString("col_d", s"d$i")
+      session.apply(insert)
+    }
+    backupAndValidateTable(tableName, 10, false)
+
+    // Rename col_a to col_1 and add a new col_a to ensure the column id's and defaults
+    // work correctly. Also drop col_d and rename col_c to ensure collisions on renaming
+    // columns don't occur when processing columns from left to right.
+    kuduClient.alterTable(
+      tableName,
+      new AlterTableOptions()
+        .renameColumn("col_a", "col_1")
+        .addColumn(new ColumnSchemaBuilder("col_a", Type.STRING)
+          .defaultValue("default")
+          .build())
+        .dropColumn("col_b")
+        .dropColumn("col_d")
+        .renameColumn("col_c", "col_d")
+    )
+
+    // Insert more rows and take an incremental backup
+    table = kuduClient.openTable(tableName)
+    Range(10, 20).foreach { i =>
+      val insert = table.newInsert
+      val row = insert.getRow
+      row.addInt("key", i)
+      row.addString("col_1", s"a$i")
+      row.addString("col_d", s"c$i")
+      session.apply(insert)
+    }
+    backupAndValidateTable(tableName, 10, true)
+
+    // Restore the table and validate.
+    doRestore(createRestoreOptions(Seq(tableName)))
+
+    val restoreTable = kuduClient.openTable(s"$tableName-restore")
+    val scanner = kuduClient.newScannerBuilder(restoreTable).build()
+    val rows = scanner.asScala.toSeq
+
+    // Validate there are still 20 rows.
+    assertEquals(20, rows.length)
+    // Validate col_b is dropped from all rows.
+    assertTrue(rows.forall(!_.getSchema.hasColumn("col_b")))
+    // Validate the existing and renamed columns have the expected set of values.
+    val expectedSet = Range(0, 20).toSet
+    assertEquals(expectedSet, rows.map(_.getInt("key")).toSet)
+    assertEquals(expectedSet.map(i => s"a$i"), rows.map(_.getString("col_1")).toSet)
+    assertEquals(expectedSet.map(i => s"c$i"), rows.map(_.getString("col_d")).toSet)
+    // Validate the new col_a has all defaults.
+    assertTrue(rows.forall(_.getString("col_a") == "default"))
+  }
+
   def createRandomTable(): KuduTable = {
     val columnCount = random.nextInt(50) + 1 // At least one column.
     val keyColumnCount = random.nextInt(columnCount) + 1 // At least one key.


Mime
View raw message