kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From granthe...@apache.org
Subject [kudu] branch master updated: [examples] Add a complete Spark quickstart example
Date Sun, 14 Jul 2019 23:27:15 GMT
This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a4c345  [examples] Add a complete Spark quickstart example
5a4c345 is described below

commit 5a4c345ecc791612ee8b4a0fee6cc6bc352afbbd
Author: Grant Henke <granthenke@apache.org>
AuthorDate: Fri Jul 12 12:45:53 2019 -0500

    [examples] Add a complete Spark quickstart example
    
    This patchs adds a brief example using Apache Spark to
    load, query, and modify a real data set in Apache Kudu.
    
    It also updates the quickstart docs to point to the
    examples directory.
    
    Change-Id: Id30a9827d96197c50bce844ac1250033c7003c9c
    Reviewed-on: http://gerrit.cloudera.org:8080/13852
    Reviewed-by: Andrew Wong <awong@cloudera.com>
    Tested-by: Grant Henke <granthenke@apache.org>
---
 docs/quickstart.adoc                  |  11 +-
 examples/quickstart/spark/README.adoc | 208 ++++++++++++++++++++++++++++++++++
 2 files changed, 217 insertions(+), 2 deletions(-)

diff --git a/docs/quickstart.adoc b/docs/quickstart.adoc
index 3f775a8..e02506b 100644
--- a/docs/quickstart.adoc
+++ b/docs/quickstart.adoc
@@ -109,9 +109,9 @@ kudu cluster ksck localhost:7051,localhost:7151,localhost:7251
 Note: Setting `KUDU_USER_NAME=kudu` simplifies using Kudu from various user
 accounts in a non-secure environment.
 
-== Running an Example
+== Running a Brief Example
 
-Now that a kudu cluster is up and running, examples and integrations can be
+Now that a Kudu cluster is up and running, examples and integrations can be
 run against the cluster. The commands below run the `java-example` against
 the quickstart cluster:
 
@@ -123,6 +123,13 @@ mvn package
 java -DkuduMasters=localhost:7051,localhost:7151,localhost:7251 -jar target/kudu-java-example-1.0-SNAPSHOT.jar
 ----
 
+== More Examples
+
+More complete walkthroughs using the quickstart Kudu cluster can be found in the
+`examples/quickstart` directory. For convenience you can browse them on
+link:https://github.com/apache/kudu/tree/master/examples/quickstart[Github].
+
+
 == Destroying the Cluster
 
 Once you are done with the quickstart cluster you can shutdown in a couple of ways.
diff --git a/examples/quickstart/spark/README.adoc b/examples/quickstart/spark/README.adoc
new file mode 100644
index 0000000..42953fe
--- /dev/null
+++ b/examples/quickstart/spark/README.adoc
@@ -0,0 +1,208 @@
+= Apache Spark Quickstart
+
+Below is a brief example using Apache Spark to load, query, and modify a real
+data set in Apache Kudu.
+
+== Start the Kudu Quickstart
+
+See the Apache Kudu
+link:https://kudu.apache.org/docs/quickstart.html[quickstart documentation]
+to setup and run the Kudu quickstart environment.
+
+== Install Spark
+
+Install Apache Spark on your host machine by following the Apach Spark
+link:https://spark.apache.org/docs/latest/#downloading[installation documentation]
+
+NOTE: If you are on a Mac you can use link:https://brew.sh/[Homebrew]
+and install with:
+
+[source,bash]
+----
+brew install apache-spark
+----
+
+== Download the data
+
+To practice some typical operations with Kudu and Spark, we'll use the
+link:https://data.sfgov.org/Transportation/Raw-AVL-GPS-data/5fk7-ivit/data[San Francisco
MTA GPS dataset].
+This dataset contains raw location data transmitted periodically from sensors
+installed on the buses in the SF MTA's fleet.
+
+1. Download the sample data.
++
+The SF MTA's site is often a bit slow, so we've mirrored a sample CSV file from the
+dataset at http://kudu-sample-data.s3.amazonaws.com/sfmtaAVLRawData01012013.csv.gz
++
+The original dataset uses DOS-style line endings, so we'll convert it to
+UNIX-style during the upload process using `tr`.
++
+There is also a missing line break after the header, so we add it using `sed`.
++
+[source,bash]
+----
+wget http://kudu-sample-data.s3.amazonaws.com/sfmtaAVLRawData01012013.csv.gz
+gunzip -c sfmtaAVLRawData01012013.csv.gz | tr -d '\r' | \
+  sed 's/PREDICTABLE/PREDICTABLE\n/g' > sfmtaAVLRawData01012013.csv
+----
+
+== Run the spark-shell with kudu-spark
+
+Run the `spark-shell` with the `kudu-spark` package:
+
+[source,bash]
+----
+spark-shell --packages org.apache.kudu:kudu-spark2_2.11:1.10.0
+----
+
+NOTE: The examples below assume you are in the `spark-shell` with the
+`kudu-spark` package.
+
+NOTE: The examples below use `:paste` to support multiline syntax.
+As noted in the `spark-shell` use `CTRL + D` after pasting the code.
+
+== Load and prepare the CSV data
+
+1. Read the CSV data.
++
+Read the plain text data into a Spark DataFrame and print the interpreted schema.
++
+[source,scala]
+----
+:paste
+val sfmta_raw = spark.sqlContext.read.format("csv")
+  .option("header", "true")
+  .option("inferSchema", "true")
+  .load("sfmtaAVLRawData01012013.csv")
+
+sfmta_raw.printSchema
+sfmta_raw.createOrReplaceTempView("sfmta_raw")
+spark.sql("SELECT count(*) FROM sfmta_raw").show()
+spark.sql("SELECT * FROM sfmta_raw LIMIT 5").show()
+----
+
+2. Prepare the data to load into Kudu.
++
+To preapare the data we will:
++
+* Convert the `REPORT_TIME` from a  string into a timestamp.
+* Mark the primary key columns, `REPORT_TIME` and `VEHICLE_TAG`, as non-nullable.
++
+[source,scala]
+----
+:paste
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.DataFrame
+def setNotNull(df: DataFrame, columns: Seq[String]) : DataFrame = {
+  val schema = df.schema
+  // Modify [[StructField] for the specified columns.
+  val newSchema = StructType(schema.map {
+    case StructField(c, t, _, m) if columns.contains(c) => StructField(c, t, nullable
= false, m)
+    case y: StructField => y
+  })
+  // Apply new schema to the DataFrame
+  df.sqlContext.createDataFrame(df.rdd, newSchema)
+}
+val sftmta_time = sfmta_raw
+  .withColumn("REPORT_TIME", to_timestamp($"REPORT_TIME", "MM/dd/yyyy HH:mm:ss"))
+val sftmta_prep = setNotNull(sftmta_time, Seq("REPORT_TIME", "VEHICLE_TAG"))
+sftmta_prep.printSchema
+sftmta_prep.createOrReplaceTempView("sftmta_prep")
+spark.sql("SELECT count(*) FROM sftmta_prep").show()
+spark.sql("SELECT * FROM sftmta_prep LIMIT 5").show()
+----
+
+== Load and prepare the Kudu table
+
+1. Create a new Kudu table
++
+Create a Kudu table with 3 replicas and 4 hash partitions using the schema
+defined by the sftmta_prep DataFrame.
++
+[source,scala]
+----
+:paste
+import collection.JavaConverters._
+import org.apache.kudu.client._
+import org.apache.kudu.spark.kudu._
+val kuduContext = new KuduContext("localhost:7051,localhost:7151,localhost:7251", spark.sparkContext)
+
+// Delete the table if it already exists.
+if(kuduContext.tableExists("sfmta_kudu")) {
+	kuduContext.deleteTable("sfmta_kudu")
+}
+
+kuduContext.createTable("sfmta_kudu", sftmta_prep.schema,
+  /* primary key */ Seq("REPORT_TIME", "VEHICLE_TAG"),
+  new CreateTableOptions()
+    .setNumReplicas(3)
+    .addHashPartitions(List("VEHICLE_TAG").asJava, 4))
+----
++
+NOTE: The table is deleted and recreated if it already exists.
+
+2. Load the Kudu table.
++
+Insert the prepared data into the Kudu table using the `kuduContext`.
++
+[source,scala]
+----
+:paste
+kuduContext.insertRows(sftmta_prep, "sfmta_kudu")
+// Create a DataFrame that points to the Kudu table we want to query.
+val sfmta_kudu = spark.read
+	.option("kudu.master", "localhost:7051,localhost:7151,localhost:7251")
+	.option("kudu.table", "sfmta_kudu")
+	// We need to use leader_only because Kudu on Docker currently doesn't
+	// support Snapshot scans due to `--use_hybrid_clock=false`.
+	.option("kudu.scanLocality", "leader_only")
+	.format("kudu").load
+sfmta_kudu.createOrReplaceTempView("sfmta_kudu")
+spark.sql("SELECT count(*) FROM sfmta_kudu").show()
+spark.sql("SELECT * FROM sfmta_kudu LIMIT 5").show()
+----
+
+== Read and Modify Data
+
+Now that the data is stored in Kudu, you can run queries against it.
+The following query finds the data point containing the highest recorded vehicle speed.
+
+[source,scala]
+----
+spark.sql("SELECT * FROM sfmta_kudu ORDER BY speed DESC LIMIT 1").show()
+----
+
+The output should look something like this:
+
+[source,scala]
+----
++-------------+-------------+--------------------+-------------------+-------------------+---------+
+| report_time | vehicle_tag | longitude          | latitude          | speed            
| heading |
++-------------+-------------+--------------------+-------------------+-------------------+---------+
+| 1357022342  | 5411        | -122.3968811035156 | 37.76665878295898 | 68.33300018310547
| 82      |
++-------------+-------------+--------------------+-------------------+-------------------+---------+
+----
+
+With a quick link:https://www.google.com/search?q=122.3968811035156W+37.76665878295898N[Google
search]
+we can see that this bus was traveling east on 16th street at 68MPH.
+At first glance, this seems unlikely to be true. Perhaps we do some research
+and find that this bus's sensor equipment was broken and we decide to
+remove the data. With Kudu this is very easy to correct using Spark:
+
+[source,scala]
+----
+spark.sql("SELECT count(*) FROM sfmta_kudu WHERE vehicle_tag = 5411").show()
+val toDelete = spark.sql("SELECT * FROM sfmta_kudu WHERE vehicle_tag = 5411")
+kuduContext.deleteRows(toDelete, "sfmta_kudu")
+spark.sql("SELECT count(*) FROM sfmta_kudu WHERE vehicle_tag = 5411").show()
+----
+
+== Next steps
+
+The above example showed how to load, query, and mutate a static dataset with
+Spark and Kudu. The real power of Kudu, however, is the ability to ingest and
+mutate data in a streaming fashion.
+
+As an exercise to learn the Kudu programmatic APIs, try implementing a program
+that uses the link:http://www.nextbus.com/xmlFeedDocs/NextBusXMLFeed.pdf[SFMTA XML data feed]
+to ingest this same dataset in real time into the Kudu table.


Mime
View raw message