blob: 33d46730387071f7087b61863f51f027097561fb [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
= Apache Spark Quickstart
Below is a brief example using Apache Spark to load, query, and modify a real
data set in Apache Kudu.
== Start the Kudu Quickstart Environment
See the Apache Kudu
link:https://kudu.apache.org/docs/quickstart.html[quickstart documentation]
to setup and run the Kudu quickstart environment.
== Install Spark
Install Apache Spark on your host machine by following the Apach Spark
link:https://spark.apache.org/docs/latest/#downloading[installation documentation]
NOTE: If you are on a Mac you can use link:https://brew.sh/[Homebrew]
and install with:
[source,bash]
----
brew install apache-spark
----
== Download the data
To practice some typical operations with Kudu and Spark, we'll use the
link:https://data.sfgov.org/Transportation/Raw-AVL-GPS-data/5fk7-ivit/data[San Francisco MTA GPS dataset].
This dataset contains raw location data transmitted periodically from sensors
installed on the buses in the SF MTA's fleet.
1. Download the sample data.
+
The SF MTA's site is often a bit slow, so we've mirrored a sample CSV file from the
dataset at http://kudu-sample-data.s3.amazonaws.com/sfmtaAVLRawData01012013.csv.gz
+
The original dataset uses DOS-style line endings, so we'll convert it to
UNIX-style during the upload process using `tr`.
+
There is also a missing line break after the header, so we add it using `sed`.
+
[source,bash]
----
wget http://kudu-sample-data.s3.amazonaws.com/sfmtaAVLRawData01012013.csv.gz
gunzip -c sfmtaAVLRawData01012013.csv.gz | tr -d '\r' | \
sed 's/PREDICTABLE/PREDICTABLE\n/g' > sfmtaAVLRawData01012013.csv
----
== Run the spark-shell with kudu-spark
Run the `spark-shell` with the `kudu-spark` package:
[source,bash]
----
spark-shell --packages org.apache.kudu:kudu-spark2_2.11:1.16.0
----
NOTE: The examples below assume you are in the `spark-shell` with the
`kudu-spark` package.
NOTE: The examples below use `:paste` to support multiline syntax.
As noted in the `spark-shell` use `CTRL + D` after pasting the code.
== Load and prepare the CSV data
1. Read the CSV data.
+
Read the plain text data into a Spark DataFrame and print the interpreted schema.
+
[source,scala]
----
:paste
val sfmta_raw = spark.sqlContext.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("sfmtaAVLRawData01012013.csv")
sfmta_raw.printSchema
sfmta_raw.createOrReplaceTempView("sfmta_raw")
spark.sql("SELECT count(*) FROM sfmta_raw").show()
spark.sql("SELECT * FROM sfmta_raw LIMIT 5").show()
----
2. Prepare the data to load into Kudu.
+
To preapare the data we will:
+
* Convert the `REPORT_TIME` from a string into a timestamp.
* Mark the primary key columns, `REPORT_TIME` and `VEHICLE_TAG`, as non-nullable.
+
[source,scala]
----
:paste
import org.apache.spark.sql.types._
import org.apache.spark.sql.DataFrame
def setNotNull(df: DataFrame, columns: Seq[String]) : DataFrame = {
val schema = df.schema
// Modify [[StructField] for the specified columns.
val newSchema = StructType(schema.map {
case StructField(c, t, _, m) if columns.contains(c) => StructField(c, t, nullable = false, m)
case y: StructField => y
})
// Apply new schema to the DataFrame
df.sqlContext.createDataFrame(df.rdd, newSchema)
}
val sftmta_time = sfmta_raw
.withColumn("REPORT_TIME", to_timestamp($"REPORT_TIME", "MM/dd/yyyy HH:mm:ss"))
val sftmta_prep = setNotNull(sftmta_time, Seq("REPORT_TIME", "VEHICLE_TAG"))
sftmta_prep.printSchema
sftmta_prep.createOrReplaceTempView("sftmta_prep")
spark.sql("SELECT count(*) FROM sftmta_prep").show()
spark.sql("SELECT * FROM sftmta_prep LIMIT 5").show()
----
== Load and prepare the Kudu table
1. Create a new Kudu table
+
Create a Kudu table with 3 replicas and 4 hash partitions using the schema
defined by the sftmta_prep DataFrame.
+
[source,scala]
----
:paste
import collection.JavaConverters._
import org.apache.kudu.client._
import org.apache.kudu.spark.kudu._
val kuduContext = new KuduContext("localhost:7051,localhost:7151,localhost:7251", spark.sparkContext)
// Delete the table if it already exists.
if(kuduContext.tableExists("sfmta_kudu")) {
kuduContext.deleteTable("sfmta_kudu")
}
kuduContext.createTable("sfmta_kudu", sftmta_prep.schema,
/* primary key */ Seq("REPORT_TIME", "VEHICLE_TAG"),
new CreateTableOptions()
.setNumReplicas(3)
.addHashPartitions(List("VEHICLE_TAG").asJava, 4))
----
+
NOTE: The table is deleted and recreated if it already exists.
2. Load the Kudu table.
+
Insert the prepared data into the Kudu table using the `kuduContext`.
+
[source,scala]
----
:paste
kuduContext.insertRows(sftmta_prep, "sfmta_kudu")
// Create a DataFrame that points to the Kudu table we want to query.
val sfmta_kudu = spark.read
.option("kudu.master", "localhost:7051,localhost:7151,localhost:7251")
.option("kudu.table", "sfmta_kudu")
// We need to use leader_only because Kudu on Docker currently doesn't
// support Snapshot scans due to `--use_hybrid_clock=false`.
.option("kudu.scanLocality", "leader_only")
.format("kudu").load
sfmta_kudu.createOrReplaceTempView("sfmta_kudu")
spark.sql("SELECT count(*) FROM sfmta_kudu").show()
spark.sql("SELECT * FROM sfmta_kudu LIMIT 5").show()
----
== Read and Modify Data
Now that the data is stored in Kudu, you can run queries against it.
The following query finds the data point containing the highest recorded vehicle speed.
[source,scala]
----
spark.sql("SELECT * FROM sfmta_kudu ORDER BY speed DESC LIMIT 1").show()
----
The output should look something like this:
[source,scala]
----
+-------------+-------------+--------------------+-------------------+-------------------+---------+
| report_time | vehicle_tag | longitude | latitude | speed | heading |
+-------------+-------------+--------------------+-------------------+-------------------+---------+
| 1357022342 | 5411 | -122.3968811035156 | 37.76665878295898 | 68.33300018310547 | 82 |
+-------------+-------------+--------------------+-------------------+-------------------+---------+
----
With a quick link:https://www.google.com/search?q=122.3968811035156W+37.76665878295898N[Google search]
we can see that this bus was traveling east on 16th street at 68MPH.
At first glance, this seems unlikely to be true. Perhaps we do some research
and find that this bus's sensor equipment was broken and we decide to
remove the data. With Kudu this is very easy to correct using Spark:
[source,scala]
----
spark.sql("SELECT count(*) FROM sfmta_kudu WHERE vehicle_tag = 5411").show()
val toDelete = spark.sql("SELECT * FROM sfmta_kudu WHERE vehicle_tag = 5411")
kuduContext.deleteRows(toDelete, "sfmta_kudu")
spark.sql("SELECT count(*) FROM sfmta_kudu WHERE vehicle_tag = 5411").show()
----
== Next steps
The above example showed how to load, query, and mutate a static dataset with
Spark and Kudu. The real power of Kudu, however, is the ability to ingest and
mutate data in a streaming fashion.
As an exercise to learn the Kudu programmatic APIs, try implementing a program
that uses the link:http://www.nextbus.com/xmlFeedDocs/NextBusXMLFeed.pdf[SFMTA XML data feed]
to ingest this same dataset in real time into the Kudu table.
== Help
If have questions, issues, or feedback on this quickstart guide, please reach out to the
link:https://kudu.apache.org/community.html[Apache Kudu community].