kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From granthe...@apache.org
Subject [2/2] kudu git commit: Kudu Backup/Restore Spark Jobs
Date Mon, 25 Jun 2018 19:45:43 GMT
Kudu Backup/Restore Spark Jobs

Adds a rough base implementation of Kudu backup and restore
Spark jobs. There are many todos indicating gaps and more testing
and details to be be finished.  However, these base jobs work and are
in a functional state that can be committed and iterated on as we
build up and improve our backup functionality.

These jobs, as annotated, should be considered private, unstable,
and experimental.

The backup job can output one to many tables data to any spark
compatible path in any spark compatible format, the defaults being
HDFS and Parquet. Each table’s data is written in a subdirectory of
the provided path. The subdirectory’s name is the url encoded table
name. Additionally in each table’s directory a json metadata file is
output with the metadata needed to recreate the table that was
exported when restoring.

The restore job can read the data and metadata generated and create
“restore” tables with a matching schema and reload the data.

The job arguments are a work in progress and will likely be enhanced
and simplified as we find what is useful and what isn’t through
performance and functional testing. More documentation will be
generated when the jobs are ready for general use.

Change-Id: If02183a2f833ffa0225eb7b0a35fc7531109e6f7
Reviewed-on: http://gerrit.cloudera.org:8080/10375
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mpercy@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/148a0c7b
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/148a0c7b
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/148a0c7b

Branch: refs/heads/master
Commit: 148a0c7bec6554724339a2235cbd723fb74be339
Parents: 7192d7f
Author: Grant Henke <granthenke@apache.org>
Authored: Thu May 10 10:34:45 2018 -0500
Committer: Grant Henke <granthenke@apache.org>
Committed: Mon Jun 25 19:43:51 2018 +0000

----------------------------------------------------------------------
 java/gradle/dependencies.gradle                 |   3 +
 java/kudu-backup/build.gradle                   |  61 ++++
 java/kudu-backup/pom.xml                        | 273 +++++++++++++++++
 java/kudu-backup/src/main/protobuf/backup.proto | 115 +++++++
 .../org/apache/kudu/backup/KuduBackup.scala     |  84 ++++++
 .../apache/kudu/backup/KuduBackupOptions.scala  |  99 +++++++
 .../org/apache/kudu/backup/KuduBackupRDD.scala  | 145 +++++++++
 .../org/apache/kudu/backup/KuduRestore.scala    |  96 ++++++
 .../apache/kudu/backup/KuduRestoreOptions.scala |  85 ++++++
 .../org/apache/kudu/backup/TableMetadata.scala  | 266 +++++++++++++++++
 .../src/test/resources/log4j.properties         |  23 ++
 .../org/apache/kudu/backup/TestKuduBackup.scala | 297 +++++++++++++++++++
 .../src/main/java/org/apache/kudu/Type.java     |   9 +
 .../org/apache/kudu/client/BaseKuduTest.java    |   5 +-
 .../java/org/apache/kudu/client/TestUtils.java  |  20 ++
 .../apache/kudu/spark/kudu/TestContext.scala    |  18 +-
 java/pom.xml                                    |   1 +
 java/settings.gradle                            |   1 +
 18 files changed, 1594 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/148a0c7b/java/gradle/dependencies.gradle
----------------------------------------------------------------------
diff --git a/java/gradle/dependencies.gradle b/java/gradle/dependencies.gradle
index 28dfd51..0e9ca92 100755
--- a/java/gradle/dependencies.gradle
+++ b/java/gradle/dependencies.gradle
@@ -55,6 +55,7 @@ versions += [
     protoc         : "3.5.1-1",
     scala          : "2.11.12",
     scalatest      : "3.0.5",
+    scopt          : "3.7.0",
     slf4j          : "1.7.25",
     spark          : "2.3.0",
     sparkAvro      : "4.0.0",
@@ -98,9 +99,11 @@ libs += [
     netty                : "io.netty:netty:$versions.netty",
     parquetHadoop        : "org.apache.parquet:parquet-hadoop:$versions.parquet",
     protobufJava         : "com.google.protobuf:protobuf-java:$versions.protobuf",
+    protobufJavaUtil     : "com.google.protobuf:protobuf-java-util:$versions.protobuf",
     protoc               : "com.google.protobuf:protoc:$versions.protoc",
     scalaLibrary         : "org.scala-lang:scala-library:$versions.scala",
     scalatest            : "org.scalatest:scalatest_$versions.scalaBase:$versions.scalatest",
+    scopt                : "com.github.scopt:scopt_$versions.scalaBase:$versions.scopt",
     slf4jApi             : "org.slf4j:slf4j-api:$versions.slf4j",
     slf4jLog4j12         : "org.slf4j:slf4j-log4j12:$versions.slf4j",
     sparkAvro            : "com.databricks:spark-avro_$versions.scalaBase:$versions.sparkAvro",

http://git-wip-us.apache.org/repos/asf/kudu/blob/148a0c7b/java/kudu-backup/build.gradle
----------------------------------------------------------------------
diff --git a/java/kudu-backup/build.gradle b/java/kudu-backup/build.gradle
new file mode 100644
index 0000000..55e664c
--- /dev/null
+++ b/java/kudu-backup/build.gradle
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+apply plugin: "scala"
+apply from: "$rootDir/gradle/protobuf.gradle"
+apply from: "$rootDir/gradle/shadow.gradle"
+
+dependencies {
+  compile project(path: ":kudu-client", configuration: "shadow")
+  compile project(path: ":kudu-spark", configuration: "shadow")
+  compile libs.protobufJava
+  compile (libs.protobufJavaUtil) {
+    // Make sure wrong Guava version is not pulled in.
+    exclude group: "com.google.guava", module: "guava"
+  }
+  compile (libs.scopt)  {
+    // Make sure wrong Scala version is not pulled in.
+    exclude group: "org.scala-lang", module: "scala-library"
+  }
+
+  provided libs.scalaLibrary
+  provided libs.sparkAvro
+  provided libs.sparkCore
+  provided libs.sparkSql
+  provided libs.slf4jApi
+
+  optional libs.yetusAnnotations
+
+  testCompile project(path: ":kudu-client", configuration: "shadowTest")
+  testCompile project(path: ":kudu-spark", configuration: "test")
+  testCompile libs.junit
+  testCompile libs.log4j
+  testCompile libs.scalatest
+  testCompile libs.slf4jLog4j12
+}
+
+// Add protobuf files to the proto source set.
+sourceSets {
+  main {
+    proto {
+      srcDir "src/main/protobuf"
+    }
+  }
+}
+
+// Adjust the artifact name to match the maven build.
+archivesBaseName = "kudu-backup${versions.sparkBase}_${versions.scalaBase}"

http://git-wip-us.apache.org/repos/asf/kudu/blob/148a0c7b/java/kudu-backup/pom.xml
----------------------------------------------------------------------
diff --git a/java/kudu-backup/pom.xml b/java/kudu-backup/pom.xml
new file mode 100644
index 0000000..11b6ab0
--- /dev/null
+++ b/java/kudu-backup/pom.xml
@@ -0,0 +1,273 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.kudu</groupId>
+        <artifactId>kudu-parent</artifactId>
+        <version>1.8.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>kudu-backup${spark.version.label}_${scala.binary.version}</artifactId>
+    <name>Kudu Backup</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.kudu</groupId>
+            <artifactId>kudu-client</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kudu</groupId>
+            <artifactId>kudu-${spark.version.label}_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>${protobuf.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java-util</artifactId>
+            <version>${protobuf.version}</version>
+            <exclusions>
+                <exclusion>
+                    <!-- Make sure wrong Guava version is not pulled in. -->
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.github.scopt</groupId>
+            <artifactId>scopt_${scala.binary.version}</artifactId>
+            <version>${scopt.version}</version>
+            <exclusions>
+                <exclusion>
+                    <!-- Make sure wrong Scala version is not pulled in. -->
+                    <groupId>org.scala-lang</groupId>
+                    <artifactId>scala-library</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>com.databricks</groupId>
+            <artifactId>spark-avro_${scala.binary.version}</artifactId>
+            <version>${sparkavro.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>${scala.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>${slf4j.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.yetus</groupId>
+            <artifactId>audience-annotations</artifactId>
+            <version>${yetus.version}</version>
+            <optional>true</optional>
+        </dependency>
+
+
+        <dependency>
+            <groupId>org.apache.kudu</groupId>
+            <artifactId>kudu-${spark.version.label}_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+            <version>${log4j.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.scalatest</groupId>
+            <artifactId>scalatest_${scala.binary.version}</artifactId>
+            <version>${scalatest.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>${slf4j.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <extensions>
+            <!-- Used in the protobuf plugin to find the right protoc artifact
+                 with the property os.detected.classifier -->
+            <extension>
+                <groupId>kr.motd.maven</groupId>
+                <artifactId>os-maven-plugin</artifactId>
+                <version>${maven-os-plugin.version}</version>
+            </extension>
+        </extensions>
+        <plugins>
+            <plugin>
+                <groupId>org.xolstice.maven.plugins</groupId>
+                <artifactId>protobuf-maven-plugin</artifactId>
+                <version>${maven-protoc-plugin.version}</version>
+                <extensions>true</extensions>
+                <configuration>
+                    <!-- Documented at:
+                         https://www.xolstice.org/protobuf-maven-plugin/usage.html -->
+                    <checkStaleness>true</checkStaleness>
+                    <protoSourceRoot>src/main/protobuf</protoSourceRoot>
+                    <protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>compile</goal>
+                        </goals>
+                        <phase>generate-sources</phase>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <version>${scala-maven-plugin.version}</version>
+                <configuration>
+                    <charset>${project.build.sourceEncoding}</charset>
+                    <scalaVersion>${scala.version}</scalaVersion>
+                    <args>
+                        <!-- Emit warning and location for usages of features that should be imported explicitly. -->
+                        <arg>-feature</arg>
+                        <!-- Emit various static analysis warnings. -->
+                        <arg>-Xlint</arg>
+                    </args>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>scala-compile-first</id>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>scala-test-compile</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>${maven-shade-plugin.version}</version>
+            </plugin>
+        </plugins>
+
+        <!-- This big ol' block of nonsense tells the m2e Eclipse plugin what
+            to do with maven plugins that don't have m2e "extensions" available.
+
+            It doesn't affect the Maven build at all. -->
+        <pluginManagement>
+            <plugins>
+                <plugin>
+                    <groupId>org.eclipse.m2e</groupId>
+                    <artifactId>lifecycle-mapping</artifactId>
+                    <version>1.0.0</version>
+                    <configuration>
+                        <lifecycleMappingMetadata>
+                            <pluginExecutions>
+                                <pluginExecution>
+                                    <pluginExecutionFilter>
+                                        <groupId>org.apache.maven.plugins</groupId>
+                                        <artifactId>maven-antrun-plugin</artifactId>
+                                        <versionRange>[${maven-antrun-plugin.version},)</versionRange>
+                                        <goals>
+                                            <goal>run</goal>
+                                        </goals>
+                                    </pluginExecutionFilter>
+                                    <action>
+                                        <execute>
+                                            <runOnIncremental>false</runOnIncremental>
+                                        </execute>
+                                    </action>
+                                </pluginExecution>
+                                <pluginExecution>
+                                    <pluginExecutionFilter>
+                                        <groupId>org.xolstice.maven.plugins</groupId>
+                                        <artifactId>protobuf-maven-plugin</artifactId>
+                                        <versionRange>[${maven-protoc-plugin.version},)</versionRange>
+                                        <goals>
+                                            <goal>compile</goal>
+                                        </goals>
+                                    </pluginExecutionFilter>
+                                    <action>
+                                        <execute>
+                                            <runOnIncremental>false</runOnIncremental>
+                                        </execute>
+                                    </action>
+                                </pluginExecution>
+                            </pluginExecutions>
+                        </lifecycleMappingMetadata>
+                    </configuration>
+                </plugin>
+            </plugins>
+        </pluginManagement>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/kudu/blob/148a0c7b/java/kudu-backup/src/main/protobuf/backup.proto
----------------------------------------------------------------------
diff --git a/java/kudu-backup/src/main/protobuf/backup.proto b/java/kudu-backup/src/main/protobuf/backup.proto
new file mode 100644
index 0000000..372c665
--- /dev/null
+++ b/java/kudu-backup/src/main/protobuf/backup.proto
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Protobufs which are used in the Kudu backup process.
+// Though these are similar to the kudu common protobufs,
+// they are specific to the kudu backup application and
+// should be kept seperate.
+syntax = "proto3";
+package kudu.backup;
+
+option java_package = "org.apache.kudu.backup";
+
+import "google/protobuf/wrappers.proto";
+
+// Maps to the ColumnTypeAttributes class.
+// The fields are effectively 1 to 1 mappings of those in ColumnTypeAttributes.
+message ColumnTypeAttributesMetadataPB {
+  int32 precision = 1;
+  int32 scale = 2;
+}
+
+// Maps to the ColumnSchema class.
+// The fields are effectively 1 to 1 mappings of those in ColumnSchema.
+message ColumnMetadataPB {
+  string name = 1;
+  string type = 2;
+  ColumnTypeAttributesMetadataPB type_attributes = 3;
+  bool is_key = 4;
+  bool is_nullable = 5;
+  // Uses the StringValue wrapper so we can differentiate between "unset"
+  // and a default value.
+  google.protobuf.StringValue default_value = 6;
+  string encoding = 7;
+  string compression = 8;
+  int32 block_size = 9;
+}
+
+// A human readable string representation of a column value for use
+// in the RangeBoundsMetadataPB.
+message ColumnValueMetadataPB {
+  string column_name = 1;
+  // This is a human-readable string value that can
+  // be parsed back into the appropriate type.
+  string value = 2;
+}
+
+// These bounds are used in CreateTableOptions to
+// generate valid range partition splits.
+// They are encoded from a PartialRow and decoded to a PartialRow.
+// The fields are repeated because each bound can have multiple column values.
+// The number of values equals the number of columns in the range partition key,
+// the column must exist in the schema and a column can not be repeated.
+message RangeBoundsMetadataPB {
+  repeated ColumnValueMetadataPB lower_bounds = 1;
+  repeated ColumnValueMetadataPB upper_bounds = 2;
+}
+
+// Maps to RangeSchema class.
+// We add the bounds field so we can generate matching splits
+// when restoring the tables. These splits are not currently
+// included in the RangeSchema class, but are instead provided in
+// the CreateTableOptions.
+message RangePartitionMetadataPB {
+  repeated string column_names = 1;
+  repeated RangeBoundsMetadataPB bounds = 2;
+}
+
+// Maps to HashBucketSchema class.
+// The fields are effectively 1 to 1 mappings of those in HashBucketSchema.
+message HashPartitionMetadataPB {
+  repeated string column_names = 1;
+  int32 num_buckets = 2;
+  int32 seed = 3;
+}
+
+// Maps to PartitionSchema class.
+// The fields are effectively 1 to 1 mappings of those in PartitionSchema.
+message PartitionMetadataPB {
+  repeated HashPartitionMetadataPB hash_partitions = 1;
+  RangePartitionMetadataPB range_partitions = 2;
+}
+
+// Represents the metadata of a table backup. This metadata is output
+// so we can understand and create a table that matches the backed up
+// table on restore.
+message TableMetadataPB {
+  // The starting point of a backup. A UNIX timestamp in milliseconds since the epoch.
+  int64 from_ms = 1;
+  // The end point of a backup. A UNIX timestamp in milliseconds since the epoch.
+  int64 to_ms = 2;
+  // The file format used to store the data.
+  string data_format = 3;
+  // The name of the table.
+  string table_name = 4;
+  // The replication factor of this table.
+  int32 num_replicas = 5;
+  // The metadata for the table's columns.
+  repeated ColumnMetadataPB columns = 6;
+  // The metadata for the table's partitions.
+  PartitionMetadataPB partitions = 7;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kudu/blob/148a0c7b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
----------------------------------------------------------------------
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
new file mode 100644
index 0000000..56af3e9
--- /dev/null
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.backup
+
+import java.net.{URLEncoder}
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Path, Paths}
+
+import com.google.protobuf.util.JsonFormat
+import org.apache.hadoop.fs.{Path => HPath}
+import org.apache.kudu.backup.Backup.TableMetadataPB
+import org.apache.kudu.spark.kudu.KuduContext
+import org.apache.kudu.spark.kudu.SparkUtil._
+import org.apache.spark.sql.{SaveMode, SparkSession}
+import org.apache.yetus.audience.{InterfaceAudience, InterfaceStability}
+import org.slf4j.{Logger, LoggerFactory}
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+object KuduBackup {
+  val log: Logger = LoggerFactory.getLogger(getClass)
+
+  def run(options: KuduBackupOptions, session: SparkSession): Unit = {
+    val context = new KuduContext(options.kuduMasterAddresses, session.sparkContext)
+    val path = options.path
+    log.info(s"Backing up to path: $path")
+
+    // TODO: Make parallel so each table isn't process serially?
+    options.tables.foreach { t =>
+      val table = context.syncClient.openTable(t)
+      val tablePath = Paths.get(path).resolve(URLEncoder.encode(t, "UTF-8"))
+
+      val rdd = new KuduBackupRDD(table, options, context, session.sparkContext)
+      val df = session.sqlContext.createDataFrame(rdd, sparkSchema(table.getSchema))
+      // TODO: Prefix path with the time? Maybe a backup "name" parameter defaulted to something?
+      // TODO: Take parameter for the SaveMode.
+      val writer = df.write.mode(SaveMode.ErrorIfExists)
+      // TODO: Restrict format option.
+      // TODO: We need to cleanup partial output on failure otherwise.
+      // retries of the entire job will fail because the file already exists.
+      writer.format(options.format).save(tablePath.toString)
+
+      val tableMetadata = TableMetadata.getTableMetadata(table, options)
+      writeTableMetadata(tableMetadata, tablePath, session)
+    }
+  }
+
+  private def writeTableMetadata(metadata: TableMetadataPB, path: Path, session: SparkSession): Unit = {
+    val conf = session.sparkContext.hadoopConfiguration
+    val hPath = new HPath(path.resolve(TableMetadata.MetadataFileName).toString)
+    val fs = hPath.getFileSystem(conf)
+    val out = fs.create(hPath, /* overwrite= */ false)
+    val json = JsonFormat.printer().print(metadata)
+    out.write(json.getBytes(StandardCharsets.UTF_8))
+    out.flush()
+    out.close()
+  }
+
+  def main(args: Array[String]): Unit = {
+    val options = KuduBackupOptions.parse(args)
+      .getOrElse(throw new IllegalArgumentException("could not parse the arguments"))
+
+    val session = SparkSession.builder()
+      .appName("Kudu Table Backup")
+      .getOrCreate()
+
+    run(options, session)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/kudu/blob/148a0c7b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupOptions.scala
----------------------------------------------------------------------
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupOptions.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupOptions.scala
new file mode 100644
index 0000000..04053cb
--- /dev/null
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupOptions.scala
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.backup
+
+import java.net.InetAddress
+
+import org.apache.kudu.client.AsyncKuduClient
+import org.apache.yetus.audience.{InterfaceAudience, InterfaceStability}
+import scopt.OptionParser
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+case class KuduBackupOptions(tables: Seq[String],
+                             path: String,
+                             kuduMasterAddresses: String = InetAddress.getLocalHost.getCanonicalHostName,
+                             timestampMs: Long = System.currentTimeMillis(),
+                             format: String = KuduBackupOptions.DefaultFormat,
+                             scanBatchSize: Int = KuduBackupOptions.DefaultScanBatchSize,
+                             scanRequestTimeout: Long = KuduBackupOptions.DefaultScanRequestTimeout,
+                             scanPrefetching: Boolean = KuduBackupOptions.DefaultScanPrefetching)
+
+object KuduBackupOptions {
+  val DefaultFormat: String = "parquet"
+  val DefaultScanBatchSize: Int = 1024*1024*20 // 20 MiB
+  val DefaultScanRequestTimeout: Long = AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS // 30 seconds
+  val DefaultScanPrefetching: Boolean = false // TODO: Add a test per KUDU-1260 and enable by default?
+
+  // TODO: clean up usage output.
+  // TODO: timeout configurations.
+  private val parser: OptionParser[KuduBackupOptions] = new OptionParser[KuduBackupOptions]("KuduBackup") {
+    opt[String]("path")
+      .action((v, o) => o.copy(path = v))
+      .text("The root path to output backup data. Accepts any Spark compatible path.")
+      .optional()
+
+    opt[String]("kuduMasterAddresses")
+      .action((v, o) => o.copy(kuduMasterAddresses = v))
+      .text("Comma-separated addresses of Kudu masters.")
+      .optional()
+
+    opt[Long]("timestampMs")
+      .action((v, o) => o.copy(timestampMs = v))
+      // TODO: Document the limitations based on cluster configuration (ex: ancient history watermark).
+      .text("A UNIX timestamp in milliseconds since the epoch to execute scans at.")
+      .optional()
+
+    opt[String]("format")
+      .action((v, o) => o.copy(format = v))
+      .text("The file format to use when writing the data.")
+      .optional()
+
+    opt[Int]("scanBatchSize")
+      .action((v, o) => o.copy(scanBatchSize = v))
+      .text("The maximum number of bytes returned by the scanner, on each batch.")
+      .optional()
+
+    opt[Int]("scanRequestTimeout")
+      .action((v, o) => o.copy(scanRequestTimeout = v))
+      .text("Sets how long each scan request to a server can last.")
+      .optional()
+
+    opt[Unit]("scanPrefetching")
+      .action( (_, o) => o.copy(scanPrefetching = true) )
+      .text("An experimental flag to enable pre-fetching data.")
+      .optional()
+
+    arg[String]("<table>...")
+      .unbounded()
+      .action( (v, o) => o.copy(tables = o.tables :+ v) )
+      .text("A list of tables to be backed up.")
+  }
+
+  /**
+    * Parses the passed arguments into Some[KuduBackupOptions].
+    *
+    * If the arguments are bad, an error message is displayed
+    * and None is returned.
+    *
+    * @param args The arguments to parse.
+    * @return Some[KuduBackupOptions] if parsing was successful, None if not.
+    */
+  def parse(args: Seq[String]): Option[KuduBackupOptions] = {
+    parser.parse(args, KuduBackupOptions(Seq(), null))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kudu/blob/148a0c7b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
----------------------------------------------------------------------
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
new file mode 100644
index 0000000..e0acdaa
--- /dev/null
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kudu.backup
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.kudu.Type
+import org.apache.kudu.client.AsyncKuduScanner.ReadMode
+import org.apache.kudu.client._
+import org.apache.kudu.spark.kudu.KuduContext
+import org.apache.kudu.util.HybridTimeUtil
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.yetus.audience.{InterfaceAudience, InterfaceStability}
+
+import scala.collection.JavaConverters._
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class KuduBackupRDD private[kudu](@transient val table: KuduTable,
+                                  @transient val options: KuduBackupOptions,
+                                  val kuduContext: KuduContext,
+                                  @transient val sc: SparkContext
+                                 ) extends RDD[Row](sc, Nil) {
+
+  // TODO: Split large tablets into smaller scan tokens?
+  override protected def getPartitions: Array[Partition] = {
+    val client = kuduContext.syncClient
+
+    // Set a hybrid time for the scan to ensure application consistency.
+    val timestampMicros = TimeUnit.MILLISECONDS.toMicros(options.timestampMs)
+    val hybridTime = HybridTimeUtil.physicalAndLogicalToHTTimestamp(timestampMicros, 0)
+
+    // Create the scan tokens for each partition.
+    val tokens = client.newScanTokenBuilder(table)
+      .cacheBlocks(false)
+      // TODO: Use fault tolerant scans to get mostly.
+      // ordered results when KUDU-2466 is fixed.
+      // .setFaultTolerant(true)
+      .replicaSelection(ReplicaSelection.CLOSEST_REPLICA)
+      .readMode(ReadMode.READ_AT_SNAPSHOT)
+      .snapshotTimestampRaw(hybridTime)
+      .batchSizeBytes(options.scanBatchSize)
+      .scanRequestTimeout(options.scanRequestTimeout)
+      .prefetching(options.scanPrefetching)
+      .build()
+
+    tokens.asScala.zipWithIndex.map {
+      case (token, index) =>
+        // TODO: Support backups from any replica or followers only.
+        // Always run on the leader for data locality.
+        val leaderLocation = token.getTablet.getLeaderReplica.getRpcHost
+        KuduBackupPartition(index, token.serialize(), Array(leaderLocation))
+    }.toArray
+  }
+
+  // TODO: Do we need a custom spark partitioner for any guarantees?
+  // override val partitioner = None
+
+  override def compute(part: Partition, taskContext: TaskContext): Iterator[Row] = {
+    val client: KuduClient = kuduContext.syncClient
+    val partition: KuduBackupPartition = part.asInstanceOf[KuduBackupPartition]
+    // TODO: Get deletes and updates for incremental backups.
+    val scanner = KuduScanToken.deserializeIntoScanner(partition.scanToken, client)
+    new RowIterator(scanner)
+  }
+
+  override def getPreferredLocations(partition: Partition): Seq[String] = {
+    partition.asInstanceOf[KuduBackupPartition].locations
+  }
+}
+
+private case class KuduBackupPartition(index: Int,
+                                       scanToken: Array[Byte],
+                                       locations: Array[String]) extends Partition
+
+/**
+  * This iterator wraps a KuduScanner, converts the returned RowResults into a
+  * Spark Row, and allows iterating over those scanned results.
+  *
+  * The Spark RDD abstraction has an abstract compute method, implemented in KuduBackupRDD,
+  * that takes the job partitions and task context and expects to return an Iterator[Row].
+  * This implementation facilitates that.
+  */
+private class RowIterator(private val scanner: KuduScanner) extends Iterator[Row] {
+
+  private var currentIterator: RowResultIterator = _
+
+  override def hasNext: Boolean = {
+    while ((currentIterator != null && !currentIterator.hasNext && scanner.hasMoreRows) ||
+      (scanner.hasMoreRows && currentIterator == null)) {
+      if (TaskContext.get().isInterrupted()) {
+        throw new RuntimeException("KuduBackup spark task interrupted")
+      }
+      currentIterator = scanner.nextRows()
+    }
+    currentIterator.hasNext
+  }
+
+  // TODO: Use a more "raw" encoding for efficiency?
+  private def get(rowResult: RowResult, i: Int): Any = {
+    if (rowResult.isNull(i)) null
+    else rowResult.getColumnType(i) match {
+      case Type.BOOL => rowResult.getBoolean(i)
+      case Type.INT8 => rowResult.getByte(i)
+      case Type.INT16 => rowResult.getShort(i)
+      case Type.INT32 => rowResult.getInt(i)
+      case Type.INT64 => rowResult.getLong(i)
+      case Type.UNIXTIME_MICROS => rowResult.getTimestamp(i)
+      case Type.FLOAT => rowResult.getFloat(i)
+      case Type.DOUBLE => rowResult.getDouble(i)
+      case Type.STRING => rowResult.getString(i)
+      case Type.BINARY => rowResult.getBinaryCopy(i)
+      case Type.DECIMAL => rowResult.getDecimal(i)
+      case _ => throw new RuntimeException(s"Unsupported column type: ${rowResult.getColumnType(i)}")
+    }
+  }
+
+  // TODO: There may be an old KuduRDD implementation where we did some
+  // sort of zero copy/object pool pattern for performance (we could use that here).
+  override def next(): Row = {
+    val rowResult = currentIterator.next()
+    val columnCount = rowResult.getColumnProjection.getColumnCount
+    val columns = Array.ofDim[Any](columnCount)
+    for (i <- 0 until columnCount) {
+      columns(i) = get(rowResult, i)
+    }
+    Row.fromSeq(columns)
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/148a0c7b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
----------------------------------------------------------------------
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
new file mode 100644
index 0000000..bd0cc3c
--- /dev/null
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.backup
+
+import java.io.InputStreamReader
+import java.net.URLEncoder
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Path, Paths}
+
+import com.google.common.io.CharStreams
+import com.google.protobuf.util.JsonFormat
+import org.apache.hadoop.fs.{FileSystem, Path => HPath}
+import org.apache.kudu.backup.Backup.TableMetadataPB
+import org.apache.kudu.spark.kudu.{KuduContext, KuduWriteOptions}
+import org.apache.spark.sql.SparkSession
+import org.apache.yetus.audience.{InterfaceAudience, InterfaceStability}
+import org.slf4j.{Logger, LoggerFactory}
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+object KuduRestore {
+  val log: Logger = LoggerFactory.getLogger(getClass)
+
+  def run(options: KuduRestoreOptions, session: SparkSession): Unit = {
+    val context = new KuduContext(options.kuduMasterAddresses, session.sparkContext)
+    val path = options.path
+    log.info(s"Restoring from path: $path")
+
+    // TODO: Make parallel so each table isn't processed serially.
+    options.tables.foreach { t =>
+      val tablePath = Paths.get(path).resolve(URLEncoder.encode(t, "UTF-8"))
+      val metadataPath = getMetadataPath(t, options)
+      val metadata = readTableMetadata(metadataPath, session)
+      val restoreName = s"${metadata.getTableName}${options.tableSuffix}"
+      val table =
+        if (options.createTables) {
+          // Read the metadata and generate a schema.
+          val schema = TableMetadata.getKuduSchema(metadata)
+          val createTableOptions = TableMetadata.getCreateTableOptions(metadata)
+          context.createTable(restoreName, schema, createTableOptions)
+        } else {
+          context.syncClient.openTable(restoreName)
+        }
+
+      // TODO: Restrict format option.
+      val df = session.sqlContext.read.format(metadata.getDataFormat).load(tablePath.toString)
+      val writeOptions = new KuduWriteOptions(ignoreDuplicateRowErrors = false, ignoreNull = false)
+      // TODO: Use client directly for more control?
+      // (session timeout, consistency mode, flush interval, mutation buffer space)
+      context.insertRows(df, restoreName, writeOptions)
+    }
+  }
+
+  private def getMetadataPath(tableName: String, options: KuduRestoreOptions): Path = {
+    val rootPath = if (options.metadataPath.isEmpty) options.path else options.metadataPath
+    Paths.get(rootPath).resolve(tableName)
+  }
+
+  private def readTableMetadata(path: Path, session: SparkSession): TableMetadataPB = {
+    val conf = session.sparkContext.hadoopConfiguration
+    val hPath = new HPath(path.resolve(TableMetadata.MetadataFileName).toString)
+    val fs = hPath.getFileSystem(conf)
+    val in = new InputStreamReader(fs.open(hPath), StandardCharsets.UTF_8)
+    val json = CharStreams.toString(in)
+    in.close()
+    val builder = TableMetadataPB.newBuilder()
+    JsonFormat.parser().merge(json, builder)
+    builder.build()
+  }
+
+  def main(args: Array[String]): Unit = {
+    val options = KuduRestoreOptions.parse(args)
+      .getOrElse(throw new IllegalArgumentException("could not parse the arguments"))
+
+    val session = SparkSession.builder()
+      .appName("Kudu Table Restore")
+      .getOrCreate()
+
+    run(options, session)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/kudu/blob/148a0c7b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestoreOptions.scala
----------------------------------------------------------------------
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestoreOptions.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestoreOptions.scala
new file mode 100644
index 0000000..2cd1d4d
--- /dev/null
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestoreOptions.scala
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.backup
+
+import java.net.InetAddress
+
+import org.apache.yetus.audience.{InterfaceAudience, InterfaceStability}
+import scopt.OptionParser
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+case class KuduRestoreOptions(tables: Seq[String],
+                              path: String,
+                              kuduMasterAddresses: String = InetAddress.getLocalHost.getCanonicalHostName,
+                              tableSuffix: String = KuduRestoreOptions.DefaultTableSuffix,
+                              createTables: Boolean = KuduRestoreOptions.DefaultCreateTables,
+                              metadataPath: String = "")
+
+object KuduRestoreOptions {
+  val DefaultTableSuffix: String = "-restore"
+  val DefaultCreateTables: Boolean = true
+
+  // TODO: clean up usage output.
+  // TODO: timeout configurations.
+  private val parser: OptionParser[KuduRestoreOptions] = new OptionParser[KuduRestoreOptions]("KuduRestore") {
+    opt[String]("path")
+      .action((v, o) => o.copy(path = v))
+      .text("The root path to the backup data. Accepts any Spark compatible path.")
+      .optional()
+
+    opt[String]("kuduMasterAddresses")
+      .action((v, o) => o.copy(kuduMasterAddresses = v))
+      .text("Comma-separated addresses of Kudu masters.")
+      .optional()
+
+    opt[Boolean]("createTables")
+      .action( (v, o) => o.copy(createTables = v) )
+      .text("true to create tables during restore, false if they already exist.")
+      .optional()
+
+    opt[String]("tableSuffix")
+      .action((v, o) => o.copy(tableSuffix = v))
+      .text("The suffix to add to the restored table names. Only used when createTables is true.")
+      .optional()
+
+    opt[String]("metadataPath")
+      .action((v, o) => o.copy(metadataPath = v))
+      .text("The root path to look for table metadata. This can be used to change the properties of " +
+        "the tables created during restore. By default the backup path is used. " +
+        "Only used when createTables is true.")
+      .optional()
+
+    arg[String]("<table>...")
+      .unbounded()
+      .action( (v, o) => o.copy(tables = o.tables :+ v) )
+      .text("A list of tables to be restored.")
+  }
+
+  /**
+    * Parses the passed arguments into Some[KuduRestoreOptions].
+    *
+    * If the arguments are bad, an error message is displayed
+    * and None is returned.
+    *
+    * @param args The arguments to parse.
+    * @return Some[KuduRestoreOptions] if parsing was successful, None if not.
+    */
+  def parse(args: Seq[String]): Option[KuduRestoreOptions] = {
+    parser.parse(args, KuduRestoreOptions(Seq(), null))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kudu/blob/148a0c7b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
----------------------------------------------------------------------
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
new file mode 100644
index 0000000..9110e5f
--- /dev/null
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kudu.backup
+
+import java.math.BigDecimal
+import java.util.Base64
+
+import com.google.protobuf.StringValue
+import org.apache.kudu.backup.Backup._
+import org.apache.kudu.ColumnSchema.{ColumnSchemaBuilder, CompressionAlgorithm, Encoding}
+import org.apache.kudu.ColumnTypeAttributes.ColumnTypeAttributesBuilder
+import org.apache.kudu.client.{CreateTableOptions, KuduTable, PartialRow}
+import org.apache.kudu.{ColumnSchema, Schema, Type}
+import org.apache.yetus.audience.{InterfaceAudience, InterfaceStability}
+
+import scala.collection.JavaConverters._
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+object TableMetadata {
+
+  val MetadataFileName = ".kudu-metadata.json"
+
+  def getTableMetadata(table: KuduTable, options: KuduBackupOptions): TableMetadataPB = {
+    val columns = table.getSchema.getColumns.asScala.map { col =>
+      val builder = ColumnMetadataPB.newBuilder()
+        .setName(col.getName)
+        .setType(col.getType.name())
+        .setIsKey(col.isKey)
+        .setIsNullable(col.isNullable)
+        .setEncoding(col.getEncoding.toString)
+        .setCompression(col.getCompressionAlgorithm.toString)
+        .setBlockSize(col.getDesiredBlockSize)
+      if (col.getTypeAttributes != null) {
+        builder.setTypeAttributes(getTypeAttributesMetadata(col))
+      }
+      if (col.getDefaultValue != null) {
+        builder.setDefaultValue(StringValue.of(valueToString(col.getDefaultValue, col.getType)))
+      }
+      builder.build()
+    }
+
+    TableMetadataPB.newBuilder()
+      .setFromMs(0) // TODO: fromMs is always zero until we support incremental backups
+      .setToMs(options.timestampMs)
+      .setDataFormat(options.format)
+      .setTableName(table.getName)
+      .addAllColumns(columns.asJava)
+      .setNumReplicas(table.getNumReplicas)
+      .setPartitions(getPartitionMetadata(table))
+      .build()
+  }
+
+  private def getTypeAttributesMetadata(col: ColumnSchema): ColumnTypeAttributesMetadataPB = {
+    val attributes = col.getTypeAttributes
+    ColumnTypeAttributesMetadataPB.newBuilder()
+      .setPrecision(attributes.getPrecision)
+      .setScale(attributes.getScale)
+      .build()
+  }
+
+  private def getPartitionMetadata(table: KuduTable): PartitionMetadataPB = {
+    val hashPartitions = getHashPartitionsMetadata(table)
+    val rangePartitions = getRangePartitionMetadata(table)
+    PartitionMetadataPB.newBuilder()
+      .addAllHashPartitions(hashPartitions.asJava)
+      .setRangePartitions(rangePartitions)
+      .build()
+  }
+
+  private def getHashPartitionsMetadata(table: KuduTable): Seq[HashPartitionMetadataPB] = {
+    val tableSchema = table.getSchema
+    val partitionSchema = table.getPartitionSchema
+    partitionSchema.getHashBucketSchemas.asScala.map { hs =>
+      val columnNames = hs.getColumnIds.asScala.map { id =>
+        getColumnById(tableSchema, id).getName
+      }
+      HashPartitionMetadataPB.newBuilder()
+        .addAllColumnNames(columnNames.asJava)
+        .setNumBuckets(hs.getNumBuckets)
+        .setSeed(hs.getSeed)
+        .build()
+    }
+  }
+
+  private def getRangePartitionMetadata(table: KuduTable): RangePartitionMetadataPB = {
+    val tableSchema = table.getSchema
+    val partitionSchema = table.getPartitionSchema
+    val columnNames = partitionSchema.getRangeSchema.getColumnIds.asScala.map { id =>
+      getColumnById(tableSchema, id).getName
+    }
+
+    val bounds = table.getRangePartitions(table.getAsyncClient.getDefaultOperationTimeoutMs)
+      .asScala.map { p =>
+        val lowerValues = getBoundValues(p.getDecodedRangeKeyStart(table), columnNames, tableSchema)
+        val upperValues = getBoundValues(p.getDecodedRangeKeyEnd(table), columnNames, tableSchema)
+        RangeBoundsMetadataPB.newBuilder()
+          .addAllUpperBounds(upperValues.asJava)
+          .addAllLowerBounds(lowerValues.asJava)
+          .build()
+      }
+    RangePartitionMetadataPB.newBuilder()
+      .addAllColumnNames(columnNames.asJava)
+      .addAllBounds(bounds.asJava)
+      .build()
+  }
+
+  private def getColumnById(schema: Schema, colId: Int): ColumnSchema = {
+    schema.getColumnByIndex(schema.getColumnIndex(colId))
+  }
+
+  private def getBoundValues(bound: PartialRow, columnNames: Seq[String], schema: Schema): Seq[ColumnValueMetadataPB] = {
+    columnNames.filter(bound.isSet).map { col =>
+      val colType = schema.getColumn(col).getType
+      val value = getValue(bound, col, colType)
+      ColumnValueMetadataPB.newBuilder()
+        .setColumnName(col)
+        .setValue(valueToString(value, colType))
+        .build()
+    }
+  }
+
+  private def getPartialRow(values: Seq[ColumnValueMetadataPB], schema: Schema): PartialRow = {
+    val row = schema.newPartialRow()
+    values.foreach { v =>
+       val colType = schema.getColumn(v.getColumnName).getType
+       addValue(valueFromString(v.getValue, colType), row, v.getColumnName, colType)
+    }
+    row
+  }
+
+  def getKuduSchema(metadata: TableMetadataPB): Schema = {
+    val columns = metadata.getColumnsList.asScala.map { col =>
+      val colType = Type.getTypeForName(col.getType)
+      val builder = new ColumnSchemaBuilder(col.getName, colType)
+        .key(col.getIsKey)
+        .nullable(col.getIsNullable)
+        .encoding(Encoding.valueOf(col.getEncoding))
+        .compressionAlgorithm(CompressionAlgorithm.valueOf(col.getCompression))
+        .desiredBlockSize(col.getBlockSize)
+
+      if (col.hasDefaultValue) {
+        val value = valueFromString(col.getDefaultValue.getValue, colType)
+        builder.defaultValue(value)
+      }
+
+      if (col.hasTypeAttributes) {
+        val attributes = col.getTypeAttributes
+        builder.typeAttributes(
+          new ColumnTypeAttributesBuilder()
+            .precision(attributes.getPrecision)
+            .scale(attributes.getScale)
+            .build()
+        )
+      }
+      builder.build()
+    }
+    new Schema(columns.asJava)
+  }
+
+  private def getValue(row: PartialRow, columnName: String, colType: Type): Any = {
+    colType match {
+      case Type.BOOL => row.getBoolean(columnName)
+      case Type.INT8 => row.getByte(columnName)
+      case Type.INT16 => row.getShort(columnName)
+      case Type.INT32 => row.getInt(columnName)
+      case Type.INT64 | Type.UNIXTIME_MICROS => row.getLong(columnName)
+      case Type.FLOAT => row.getFloat(columnName)
+      case Type.DOUBLE => row.getDouble(columnName)
+      case Type.STRING => row.getString(columnName)
+      case Type.BINARY => row.getBinary(columnName)
+      case Type.DECIMAL => row.getDecimal(columnName)
+      case _ => throw new IllegalArgumentException(s"Unsupported column type: $colType")
+    }
+  }
+
+  private def addValue(value: Any, row: PartialRow, columnName: String, colType: Type): Any = {
+    colType match {
+      case Type.BOOL => row.addBoolean(columnName, value.asInstanceOf[Boolean])
+      case Type.INT8 => row.addByte(columnName, value.asInstanceOf[Byte])
+      case Type.INT16 => row.addShort(columnName, value.asInstanceOf[Short])
+      case Type.INT32 => row.addInt(columnName, value.asInstanceOf[Int])
+      case Type.INT64 | Type.UNIXTIME_MICROS => row.addLong(columnName, value.asInstanceOf[Long])
+      case Type.FLOAT => row.addFloat(columnName, value.asInstanceOf[Float])
+      case Type.DOUBLE => row.addDouble(columnName, value.asInstanceOf[Double])
+      case Type.STRING => row.addString(columnName, value.asInstanceOf[String])
+      case Type.BINARY => row.addBinary(columnName, value.asInstanceOf[Array[Byte]])
+      case Type.DECIMAL => row.addDecimal(columnName, value.asInstanceOf[BigDecimal])
+      case _ => throw new IllegalArgumentException(s"Unsupported column type: $colType")
+    }
+  }
+
+  private def valueToString(value: Any, colType: Type): String = {
+    colType match {
+      case Type.BOOL =>
+        String.valueOf(value.asInstanceOf[Boolean])
+      case Type.INT8 =>
+        String.valueOf(value.asInstanceOf[Byte])
+      case Type.INT16 =>
+        String.valueOf(value.asInstanceOf[Short])
+      case Type.INT32 =>
+        String.valueOf(value.asInstanceOf[Int])
+      case Type.INT64 | Type.UNIXTIME_MICROS =>
+        String.valueOf(value.asInstanceOf[Long])
+      case Type.FLOAT =>
+        String.valueOf(value.asInstanceOf[Float])
+      case Type.DOUBLE =>
+        String.valueOf(value.asInstanceOf[Double])
+      case Type.STRING =>
+        value.asInstanceOf[String]
+      case Type.BINARY =>
+        Base64.getEncoder.encodeToString(value.asInstanceOf[Array[Byte]])
+      case Type.DECIMAL =>
+        value.asInstanceOf[BigDecimal].toString // TODO: Explicitly control print format
+      case _ => throw new IllegalArgumentException(s"Unsupported column type: $colType")
+    }
+  }
+
+  private def valueFromString(value: String, colType: Type): Any = {
+    colType match {
+      case Type.BOOL => value.toBoolean
+      case Type.INT8 => value.toByte
+      case Type.INT16 => value.toShort
+      case Type.INT32 => value.toInt
+      case Type.INT64 | Type.UNIXTIME_MICROS => value.toLong
+      case Type.FLOAT => value.toFloat
+      case Type.DOUBLE => value.toDouble
+      case Type.STRING => value
+      case Type.BINARY => Base64.getDecoder.decode(value)
+      case Type.DECIMAL => new BigDecimal(value) // TODO: Explicitly pass scale
+      case _ => throw new IllegalArgumentException(s"Unsupported column type: $colType")
+    }
+  }
+
+  def getCreateTableOptions(metadata: TableMetadataPB): CreateTableOptions = {
+    val schema = getKuduSchema(metadata)
+    val options = new CreateTableOptions()
+    options.setNumReplicas(metadata.getNumReplicas)
+    metadata.getPartitions.getHashPartitionsList.asScala.foreach { hp =>
+      options.addHashPartitions(hp.getColumnNamesList, hp.getNumBuckets, hp.getSeed)
+    }
+    val rangePartitionColumns = metadata.getPartitions.getRangePartitions.getColumnNamesList
+    options.setRangePartitionColumns(rangePartitionColumns)
+    metadata.getPartitions.getRangePartitions.getBoundsList.asScala.foreach { b =>
+      val lower = getPartialRow(b.getLowerBoundsList.asScala, schema)
+      val upper = getPartialRow(b.getUpperBoundsList.asScala, schema)
+      options.addRangePartition(lower, upper)
+    }
+    options
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kudu/blob/148a0c7b/java/kudu-backup/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/java/kudu-backup/src/test/resources/log4j.properties b/java/kudu-backup/src/test/resources/log4j.properties
new file mode 100644
index 0000000..535996c
--- /dev/null
+++ b/java/kudu-backup/src/test/resources/log4j.properties
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+log4j.rootLogger = WARN, out
+log4j.appender.out = org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout = org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern = %d{HH:mm:ss.SSS} [%p - %t] (%F:%L) %m%n
+
+log4j.logger.org.apache.kudu = INFO
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kudu/blob/148a0c7b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
----------------------------------------------------------------------
diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
new file mode 100644
index 0000000..8f1c807
--- /dev/null
+++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
@@ -0,0 +1,297 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.backup
+
+import java.nio.file.Files
+import java.util
+
+import com.google.common.base.Objects
+import org.apache.commons.io.FileUtils
+import org.apache.kudu.ColumnSchema.{ColumnSchemaBuilder, CompressionAlgorithm, Encoding}
+import org.apache.kudu.client.PartitionSchema.HashBucketSchema
+import org.apache.kudu.client.{CreateTableOptions, KuduTable, PartialRow, PartitionSchema, TestUtils}
+import org.apache.kudu.{ColumnSchema, Schema, Type}
+import org.apache.kudu.spark.kudu._
+import org.apache.kudu.util.DecimalUtil
+import org.junit.Assert._
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FunSuite, Matchers}
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+@RunWith(classOf[JUnitRunner])
+class TestKuduBackup extends FunSuite with TestContext with Matchers {
+  val log: Logger = LoggerFactory.getLogger(getClass)
+
+  test("Simple Backup and Restore") {
+    insertRows(100) // Insert data into the default test table.
+
+    backupAndRestore(tableName)
+
+    val rdd = kuduContext.kuduRDD(ss.sparkContext, s"$tableName-restore", List("key"))
+    assert(rdd.collect.length == 100)
+
+    val tA = kuduClient.openTable(tableName)
+    val tB = kuduClient.openTable(s"$tableName-restore")
+    assertEquals(tA.getNumReplicas, tB.getNumReplicas)
+    assertTrue(schemasMatch(tA.getSchema, tB.getSchema))
+    assertTrue(partitionSchemasMatch(tA.getPartitionSchema, tB.getPartitionSchema))
+  }
+
+  test("Random Backup and Restore") {
+    Random.javaRandomToRandom(TestUtils.getRandom)
+
+    val table = createRandomTable()
+    val tableName = table.getName
+    loadRandomData(table)
+
+    backupAndRestore(tableName)
+
+    val backupRows = kuduContext.kuduRDD(ss.sparkContext, s"$tableName").collect
+    val restoreRows = kuduContext.kuduRDD(ss.sparkContext, s"$tableName-restore").collect
+    assertEquals(backupRows.length, restoreRows.length)
+
+    val tA = kuduClient.openTable(tableName)
+    val tB = kuduClient.openTable(s"$tableName-restore")
+    assertEquals(tA.getNumReplicas, tB.getNumReplicas)
+    assertTrue(schemasMatch(tA.getSchema, tB.getSchema))
+    assertTrue(partitionSchemasMatch(tA.getPartitionSchema, tB.getPartitionSchema))
+  }
+
+  // TODO: Move to a Schema equals/equivalent method.
+  def schemasMatch(before: Schema, after: Schema): Boolean = {
+    if (before eq after) return true
+    if (before.getColumns.size != after.getColumns.size) return false
+    (0 until before.getColumns.size).forall { i =>
+      columnsMatch(before.getColumnByIndex(i), after.getColumnByIndex(i))
+    }
+  }
+
+  // TODO: Move to a ColumnSchema equals/equivalent method.
+  def columnsMatch(before: ColumnSchema, after: ColumnSchema): Boolean = {
+    if (before eq after) return true
+    Objects.equal(before.getName, after.getName) &&
+      Objects.equal(before.getType, after.getType) &&
+      Objects.equal(before.isKey, after.isKey) &&
+      Objects.equal(before.isNullable, after.isNullable) &&
+      defaultValuesMatch(before.getDefaultValue, after.getDefaultValue) &&
+      Objects.equal(before.getDesiredBlockSize, after.getDesiredBlockSize) &&
+      Objects.equal(before.getEncoding, after.getEncoding) &&
+      Objects.equal(before.getCompressionAlgorithm, after.getCompressionAlgorithm) &&
+      Objects.equal(before.getTypeAttributes, after.getTypeAttributes)
+  }
+
+  // Special handling because default values can be a byte array which is not
+  // handled by Guava's Objects.equals.
+  // See https://github.com/google/guava/issues/1425
+  def defaultValuesMatch(before: Any, after: Any): Boolean = {
+    if (before.isInstanceOf[Array[Byte]] && after.isInstanceOf[Array[Byte]]) {
+      util.Objects.deepEquals(before, after)
+    } else {
+      Objects.equal(before, after)
+    }
+  }
+
+  // TODO: Move to a PartitionSchema equals/equivalent method.
+  def partitionSchemasMatch(before: PartitionSchema, after: PartitionSchema): Boolean = {
+    if (before eq after) return true
+    val beforeBuckets = before.getHashBucketSchemas.asScala
+    val afterBuckets = after.getHashBucketSchemas.asScala
+    if (beforeBuckets.size != afterBuckets.size) return false
+    val hashBucketsMatch = (0 until beforeBuckets.size).forall { i =>
+      HashBucketSchemasMatch(beforeBuckets(i), afterBuckets(i))
+    }
+    hashBucketsMatch &&
+      Objects.equal(before.getRangeSchema.getColumnIds, after.getRangeSchema.getColumnIds)
+  }
+
+  def HashBucketSchemasMatch(before: HashBucketSchema, after: HashBucketSchema): Boolean = {
+    if (before eq after) return true
+    Objects.equal(before.getColumnIds, after.getColumnIds) &&
+      Objects.equal(before.getNumBuckets, after.getNumBuckets) &&
+      Objects.equal(before.getSeed, after.getSeed)
+  }
+
+  // TODO: Move to a test utility in kudu-client since it's generally useful.
+  def createRandomTable(): KuduTable = {
+    val columnCount = Random.nextInt(50) + 1 // At least one column.
+    val keyCount = Random.nextInt(columnCount) + 1 // At least one key.
+
+    val types = Type.values()
+    val keyTypes = types.filter { t => !Array(Type.BOOL, Type.FLOAT, Type.DOUBLE).contains(t)}
+    val compressions = CompressionAlgorithm.values().filter(_ != CompressionAlgorithm.UNKNOWN)
+    val blockSizes = Array(0, 4096, 524288, 1048576) // Default, min, middle, max.
+
+    val columns = (0 until columnCount).map { i =>
+      val key = i < keyCount
+      val t = if (key) {
+        keyTypes(Random.nextInt(keyTypes.length))
+      } else {
+        types(Random.nextInt(types.length))
+      }
+      val precision = Random.nextInt(DecimalUtil.MAX_DECIMAL_PRECISION) + 1
+      val scale =  Random.nextInt(precision)
+      val typeAttributes = DecimalUtil.typeAttributes(precision, scale)
+      val nullable = Random.nextBoolean() && !key
+      val compression = compressions(Random.nextInt(compressions.length))
+      val blockSize = blockSizes(Random.nextInt(blockSizes.length))
+      val encodings = t match {
+        case Type.INT8 | Type.INT16 | Type.INT32 |Type.INT64 | Type.UNIXTIME_MICROS =>
+          Array(Encoding.AUTO_ENCODING, Encoding.PLAIN_ENCODING, Encoding.BIT_SHUFFLE, Encoding.RLE)
+        case Type.FLOAT | Type.DOUBLE |Type.DECIMAL =>
+          Array(Encoding.AUTO_ENCODING, Encoding.PLAIN_ENCODING, Encoding.BIT_SHUFFLE)
+        case Type.STRING | Type.BINARY =>
+          Array(Encoding.AUTO_ENCODING, Encoding.PLAIN_ENCODING, Encoding.PREFIX_ENCODING, Encoding.DICT_ENCODING)
+        case Type.BOOL =>
+          Array(Encoding.AUTO_ENCODING, Encoding.PLAIN_ENCODING, Encoding.RLE)
+        case _ => throw new IllegalArgumentException(s"Unsupported type $t")
+      }
+      val encoding = encodings(Random.nextInt(encodings.length))
+
+      val builder = new ColumnSchemaBuilder(s"${t.getName}-$i", t)
+        .key(key)
+        .nullable(nullable)
+        .compressionAlgorithm(compression)
+        .desiredBlockSize(blockSize)
+        .encoding(encoding)
+      // Add type attributes to decimal columns.
+      if (t == Type.DECIMAL) {
+        builder.typeAttributes(typeAttributes)
+      }
+      // Half the columns have defaults.
+      if (Random.nextBoolean()) {
+        val defaultValue =
+          t match {
+            case Type.BOOL => Random.nextBoolean()
+            case Type.INT8 => Random.nextInt(Byte.MaxValue).asInstanceOf[Byte]
+            case Type.INT16 => Random.nextInt(Short.MaxValue).asInstanceOf[Short]
+            case Type.INT32 => Random.nextInt()
+            case Type.INT64 | Type.UNIXTIME_MICROS => Random.nextLong()
+            case Type.FLOAT => Random.nextFloat()
+            case Type.DOUBLE => Random.nextDouble()
+            case Type.DECIMAL =>
+              DecimalUtil.minValue(typeAttributes.getPrecision, typeAttributes.getScale)
+            case Type.STRING => Random.nextString(Random.nextInt(100))
+            case Type.BINARY => Random.nextString(Random.nextInt(100)).getBytes()
+            case _ => throw new IllegalArgumentException(s"Unsupported type $t")
+          }
+        builder.defaultValue(defaultValue)
+      }
+      builder.build()
+    }
+    val keyColumns = columns.filter(_.isKey)
+
+    val schema = new Schema(columns.asJava)
+
+    val options = new CreateTableOptions().setNumReplicas(1)
+    // Add hash partitioning (Max out at 3 levels to avoid being excessive).
+    val hashPartitionLevels = Random.nextInt(Math.min(keyCount, 3))
+    (0 to hashPartitionLevels).foreach { level =>
+      val hashColumn = keyColumns(level)
+      val hashBuckets = Random.nextInt(8) + 2 // Minimum of 2 hash buckets.
+      val hashSeed = Random.nextInt()
+      options.addHashPartitions(List(hashColumn.getName).asJava,  hashBuckets, hashSeed)
+    }
+    val hasRangePartition = Random.nextBoolean() && keyColumns.exists(_.getType == Type.INT64)
+    if (hasRangePartition) {
+      val rangeColumn = keyColumns.filter(_.getType == Type.INT64).head
+      options.setRangePartitionColumns(List(rangeColumn.getName).asJava)
+      val splits = Random.nextInt(8)
+      val used = new util.ArrayList[Long]()
+      var i = 0
+      while (i < splits) {
+        val split = schema.newPartialRow()
+        val value = Random.nextLong()
+        if (!used.contains(value)) {
+          used.add(value)
+          split.addLong(rangeColumn.getName, Random.nextLong())
+          i = i + 1
+        }
+      }
+    }
+
+    val name = s"random-${System.currentTimeMillis()}"
+    kuduClient.createTable(name, schema, options)
+  }
+
+  // TODO: Add updates and deletes when incremental backups are supported.
+  def loadRandomData(table: KuduTable): IndexedSeq[PartialRow] = {
+    val rowCount = Random.nextInt(200)
+
+    val kuduSession = kuduClient.newSession()
+    (0 to rowCount).map { i =>
+      val upsert = table.newUpsert()
+      val row = upsert.getRow
+      table.getSchema.getColumns.asScala.foreach { col =>
+        // Set nullable columns to null ~10% of the time.
+        if (col.isNullable && Random.nextInt(10) == 0) {
+          row.setNull(col.getName)
+        }
+        // Use the column default value  ~10% of the time.
+        if (col.getDefaultValue != null && !col.isKey && Random.nextInt(10) == 0) {
+          // Use the default value.
+        } else {
+          col.getType match {
+            case Type.BOOL =>
+              row.addBoolean(col.getName, Random.nextBoolean())
+            case Type.INT8 =>
+              row.addByte(col.getName, Random.nextInt(Byte.MaxValue).asInstanceOf[Byte])
+            case Type.INT16 =>
+              row.addShort(col.getName, Random.nextInt(Short.MaxValue).asInstanceOf[Short])
+            case Type.INT32 =>
+              row.addInt(col.getName, Random.nextInt())
+            case Type.INT64 | Type.UNIXTIME_MICROS =>
+              row.addLong(col.getName, Random.nextLong())
+            case Type.FLOAT =>
+              row.addFloat(col.getName, Random.nextFloat())
+            case Type.DOUBLE =>
+              row.addDouble(col.getName, Random.nextDouble())
+            case Type.DECIMAL =>
+              val attributes = col.getTypeAttributes
+              val max = DecimalUtil.maxValue(attributes.getPrecision, attributes.getScale)
+              row.addDecimal(col.getName, max)
+            case Type.STRING =>
+              row.addString(col.getName, Random.nextString(Random.nextInt(100)))
+            case Type.BINARY =>
+              row.addBinary(col.getName, Random.nextString(Random.nextInt(100)).getBytes())
+            case _ =>
+              throw new IllegalArgumentException(s"Unsupported type ${col.getType}")
+          }
+        }
+      }
+      kuduSession.apply(upsert)
+      row
+    }
+  }
+
+  def backupAndRestore(tableName: String): Unit = {
+    val dir = Files.createTempDirectory("backup")
+    val path = dir.toUri.toString
+
+    val backupOptions = new KuduBackupOptions(Seq(tableName), path, miniCluster.getMasterAddresses)
+    KuduBackup.run(backupOptions, ss)
+
+    val restoreOptions = new KuduRestoreOptions(Seq(tableName), path, miniCluster.getMasterAddresses)
+    KuduRestore.run(restoreOptions, ss)
+
+    FileUtils.deleteDirectory(dir.toFile)
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/148a0c7b/java/kudu-client/src/main/java/org/apache/kudu/Type.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/Type.java b/java/kudu-client/src/main/java/org/apache/kudu/Type.java
index d960db9..c3b2a91 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/Type.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/Type.java
@@ -190,4 +190,13 @@ public enum Type {
     }
   }
 
+  public static Type getTypeForName(String name) {
+    for (Type t : values()) {
+      if (t.name().equals(name)) {
+        return t;
+      }
+    }
+    throw new IllegalArgumentException("The provided name doesn't map to any known type: " + name);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/148a0c7b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
index c4c8540..ccb17b1 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
@@ -54,10 +54,7 @@ public class BaseKuduTest {
   // to complete, etc.
   protected static final int DEFAULT_SLEEP = 50000;
 
-  // Currently not specifying a seed since we want a random behavior when running tests that
-  // restart tablet servers. Would be nice to have the same kind of facility that C++ has that dumps
-  // the seed it picks so that you can re-run tests with it.
-  private static final Random randomForTSRestart = new Random();
+  private static final Random randomForTSRestart = TestUtils.getRandom();
 
   protected static MiniKuduCluster miniCluster;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/148a0c7b/java/kudu-client/src/test/java/org/apache/kudu/client/TestUtils.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestUtils.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestUtils.java
index 5ca26a4..a3196d6 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestUtils.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestUtils.java
@@ -22,6 +22,7 @@ import java.io.UnsupportedEncodingException;
 import java.lang.reflect.Field;
 import java.net.URL;
 import java.net.URLDecoder;
+import java.util.Random;
 import java.util.Set;
 
 import com.google.common.base.Joiner;
@@ -44,6 +45,8 @@ public class TestUtils {
 
   private static final String BIN_DIR_PROP = "binDir";
 
+  private static final String TEST_RANDOM_SEED_PROP = "testRandomSeed";
+
   /**
    * Return the path portion of a file URL, after decoding the escaped
    * components. This fixes issues when trying to build within a
@@ -211,4 +214,21 @@ public class TestUtils {
     replicaBuilder.setRole(role);
     return replicaBuilder;
   }
+
+  /**
+   * Get an instance of Random for use in tests and logs the seed used.
+   *
+   * Uses a default seed of System.currentTimeMillis() with the option to
+   * override via the testRandomSeed system property.
+   */
+  public static Random getRandom() {
+    // First check the system property.
+    long seed = System.currentTimeMillis();
+    if (System.getProperty(TEST_RANDOM_SEED_PROP) != null) {
+      seed = Long.parseLong(System.getProperty(TEST_RANDOM_SEED_PROP));
+      LOG.info("System property {} is defined. Overriding random seed.", TEST_RANDOM_SEED_PROP, seed);
+    }
+    LOG.info("Using random seed: {}", seed);
+    return new Random(seed);
+  }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/148a0c7b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala
index f290043..62b2c72 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala
@@ -100,11 +100,23 @@ trait TestContext extends BeforeAndAfterAll { self: Suite =>
 
     kuduContext = new KuduContext(miniCluster.getMasterAddresses, ss.sparkContext)
 
-    val tableOptions = new CreateTableOptions().setRangePartitionColumns(List("key").asJava)
-                                               .setNumReplicas(1)
+    val bottom = schema.newPartialRow() // Unbounded.
+    val middle = schema.newPartialRow()
+    middle.addInt("key", 50)
+    val top = schema.newPartialRow() // Unbounded.
+
+    val tableOptions = new CreateTableOptions()
+      .setRangePartitionColumns(List("key").asJava)
+      .addRangePartition(bottom, middle)
+      .addRangePartition(middle, top)
+      .setNumReplicas(1)
     table = kuduClient.createTable(tableName, schema, tableOptions)
 
-    kuduClient.createTable(simpleTableName, simpleSchema, tableOptions)
+    val simpleTableOptions = new CreateTableOptions()
+      .setRangePartitionColumns(List("key").asJava)
+      .setNumReplicas(1)
+
+    kuduClient.createTable(simpleTableName, simpleSchema, simpleTableOptions)
   }
 
   override def afterAll() {

http://git-wip-us.apache.org/repos/asf/kudu/blob/148a0c7b/java/pom.xml
----------------------------------------------------------------------
diff --git a/java/pom.xml b/java/pom.xml
index 28c1876..afd7c3a 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -90,6 +90,7 @@
         <!--  This is a workaround for
               https://github.com/google/protobuf/issues/4109 -->
         <protoc.version>3.5.1-1</protoc.version>
+        <scopt.version>3.7.0</scopt.version>
         <slf4j.version>1.7.25</slf4j.version>
         <sparkavro.version>4.0.0</sparkavro.version>
         <yetus.version>0.7.0</yetus.version>

http://git-wip-us.apache.org/repos/asf/kudu/blob/148a0c7b/java/settings.gradle
----------------------------------------------------------------------
diff --git a/java/settings.gradle b/java/settings.gradle
index 74367ff..6a60213 100644
--- a/java/settings.gradle
+++ b/java/settings.gradle
@@ -19,6 +19,7 @@
 // Mainly we just define what subprojects are in the build.
 
 rootProject.name = "kudu-parent"
+include "kudu-backup"
 include "kudu-client"
 include "kudu-client-tools"
 include "kudu-flume-sink"


Mime
View raw message