| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| = Apache Spark Quickstart |
| |
| Below is a brief example using Apache Spark to load, query, and modify a real |
| data set in Apache Kudu. |
| |
| == Start the Kudu Quickstart Environment |
| |
| See the Apache Kudu |
| link:https://kudu.apache.org/docs/quickstart.html[quickstart documentation] |
| to setup and run the Kudu quickstart environment. |
| |
| == Install Spark |
| |
| Install Apache Spark on your host machine by following the Apach Spark |
| link:https://spark.apache.org/docs/latest/#downloading[installation documentation] |
| |
| NOTE: If you are on a Mac you can use link:https://brew.sh/[Homebrew] |
| and install with: |
| |
| [source,bash] |
| ---- |
| brew install apache-spark |
| ---- |
| |
| == Download the data |
| |
| To practice some typical operations with Kudu and Spark, we'll use the |
| link:https://data.sfgov.org/Transportation/Raw-AVL-GPS-data/5fk7-ivit/data[San Francisco MTA GPS dataset]. |
| This dataset contains raw location data transmitted periodically from sensors |
| installed on the buses in the SF MTA's fleet. |
| |
| 1. Download the sample data. |
| + |
| The SF MTA's site is often a bit slow, so we've mirrored a sample CSV file from the |
| dataset at http://kudu-sample-data.s3.amazonaws.com/sfmtaAVLRawData01012013.csv.gz |
| + |
| The original dataset uses DOS-style line endings, so we'll convert it to |
| UNIX-style during the upload process using `tr`. |
| + |
| There is also a missing line break after the header, so we add it using `sed`. |
| + |
| [source,bash] |
| ---- |
| wget http://kudu-sample-data.s3.amazonaws.com/sfmtaAVLRawData01012013.csv.gz |
| gunzip -c sfmtaAVLRawData01012013.csv.gz | tr -d '\r' | \ |
| sed 's/PREDICTABLE/PREDICTABLE\n/g' > sfmtaAVLRawData01012013.csv |
| ---- |
| |
| == Run the spark-shell with kudu-spark |
| |
| Run the `spark-shell` with the `kudu-spark` package: |
| |
| [source,bash] |
| ---- |
| spark-shell --packages org.apache.kudu:kudu-spark2_2.11:1.14.0 |
| ---- |
| |
| NOTE: The examples below assume you are in the `spark-shell` with the |
| `kudu-spark` package. |
| |
| NOTE: The examples below use `:paste` to support multiline syntax. |
| As noted in the `spark-shell` use `CTRL + D` after pasting the code. |
| |
| == Load and prepare the CSV data |
| |
| 1. Read the CSV data. |
| + |
| Read the plain text data into a Spark DataFrame and print the interpreted schema. |
| + |
| [source,scala] |
| ---- |
| :paste |
| val sfmta_raw = spark.sqlContext.read.format("csv") |
| .option("header", "true") |
| .option("inferSchema", "true") |
| .load("sfmtaAVLRawData01012013.csv") |
| |
| sfmta_raw.printSchema |
| sfmta_raw.createOrReplaceTempView("sfmta_raw") |
| spark.sql("SELECT count(*) FROM sfmta_raw").show() |
| spark.sql("SELECT * FROM sfmta_raw LIMIT 5").show() |
| ---- |
| |
| 2. Prepare the data to load into Kudu. |
| + |
| To preapare the data we will: |
| + |
| * Convert the `REPORT_TIME` from a string into a timestamp. |
| * Mark the primary key columns, `REPORT_TIME` and `VEHICLE_TAG`, as non-nullable. |
| + |
| [source,scala] |
| ---- |
| :paste |
| import org.apache.spark.sql.types._ |
| import org.apache.spark.sql.DataFrame |
| def setNotNull(df: DataFrame, columns: Seq[String]) : DataFrame = { |
| val schema = df.schema |
| // Modify [[StructField] for the specified columns. |
| val newSchema = StructType(schema.map { |
| case StructField(c, t, _, m) if columns.contains(c) => StructField(c, t, nullable = false, m) |
| case y: StructField => y |
| }) |
| // Apply new schema to the DataFrame |
| df.sqlContext.createDataFrame(df.rdd, newSchema) |
| } |
| val sftmta_time = sfmta_raw |
| .withColumn("REPORT_TIME", to_timestamp($"REPORT_TIME", "MM/dd/yyyy HH:mm:ss")) |
| val sftmta_prep = setNotNull(sftmta_time, Seq("REPORT_TIME", "VEHICLE_TAG")) |
| sftmta_prep.printSchema |
| sftmta_prep.createOrReplaceTempView("sftmta_prep") |
| spark.sql("SELECT count(*) FROM sftmta_prep").show() |
| spark.sql("SELECT * FROM sftmta_prep LIMIT 5").show() |
| ---- |
| |
| == Load and prepare the Kudu table |
| |
| 1. Create a new Kudu table |
| + |
| Create a Kudu table with 3 replicas and 4 hash partitions using the schema |
| defined by the sftmta_prep DataFrame. |
| + |
| [source,scala] |
| ---- |
| :paste |
| import collection.JavaConverters._ |
| import org.apache.kudu.client._ |
| import org.apache.kudu.spark.kudu._ |
| val kuduContext = new KuduContext("localhost:7051,localhost:7151,localhost:7251", spark.sparkContext) |
| |
| // Delete the table if it already exists. |
| if(kuduContext.tableExists("sfmta_kudu")) { |
| kuduContext.deleteTable("sfmta_kudu") |
| } |
| |
| kuduContext.createTable("sfmta_kudu", sftmta_prep.schema, |
| /* primary key */ Seq("REPORT_TIME", "VEHICLE_TAG"), |
| new CreateTableOptions() |
| .setNumReplicas(3) |
| .addHashPartitions(List("VEHICLE_TAG").asJava, 4)) |
| ---- |
| + |
| NOTE: The table is deleted and recreated if it already exists. |
| |
| 2. Load the Kudu table. |
| + |
| Insert the prepared data into the Kudu table using the `kuduContext`. |
| + |
| [source,scala] |
| ---- |
| :paste |
| kuduContext.insertRows(sftmta_prep, "sfmta_kudu") |
| // Create a DataFrame that points to the Kudu table we want to query. |
| val sfmta_kudu = spark.read |
| .option("kudu.master", "localhost:7051,localhost:7151,localhost:7251") |
| .option("kudu.table", "sfmta_kudu") |
| // We need to use leader_only because Kudu on Docker currently doesn't |
| // support Snapshot scans due to `--use_hybrid_clock=false`. |
| .option("kudu.scanLocality", "leader_only") |
| .format("kudu").load |
| sfmta_kudu.createOrReplaceTempView("sfmta_kudu") |
| spark.sql("SELECT count(*) FROM sfmta_kudu").show() |
| spark.sql("SELECT * FROM sfmta_kudu LIMIT 5").show() |
| ---- |
| |
| == Read and Modify Data |
| |
| Now that the data is stored in Kudu, you can run queries against it. |
| The following query finds the data point containing the highest recorded vehicle speed. |
| |
| [source,scala] |
| ---- |
| spark.sql("SELECT * FROM sfmta_kudu ORDER BY speed DESC LIMIT 1").show() |
| ---- |
| |
| The output should look something like this: |
| |
| [source,scala] |
| ---- |
| +-------------+-------------+--------------------+-------------------+-------------------+---------+ |
| | report_time | vehicle_tag | longitude | latitude | speed | heading | |
| +-------------+-------------+--------------------+-------------------+-------------------+---------+ |
| | 1357022342 | 5411 | -122.3968811035156 | 37.76665878295898 | 68.33300018310547 | 82 | |
| +-------------+-------------+--------------------+-------------------+-------------------+---------+ |
| ---- |
| |
| With a quick link:https://www.google.com/search?q=122.3968811035156W+37.76665878295898N[Google search] |
| we can see that this bus was traveling east on 16th street at 68MPH. |
| At first glance, this seems unlikely to be true. Perhaps we do some research |
| and find that this bus's sensor equipment was broken and we decide to |
| remove the data. With Kudu this is very easy to correct using Spark: |
| |
| [source,scala] |
| ---- |
| spark.sql("SELECT count(*) FROM sfmta_kudu WHERE vehicle_tag = 5411").show() |
| val toDelete = spark.sql("SELECT * FROM sfmta_kudu WHERE vehicle_tag = 5411") |
| kuduContext.deleteRows(toDelete, "sfmta_kudu") |
| spark.sql("SELECT count(*) FROM sfmta_kudu WHERE vehicle_tag = 5411").show() |
| ---- |
| |
| == Next steps |
| |
| The above example showed how to load, query, and mutate a static dataset with |
| Spark and Kudu. The real power of Kudu, however, is the ability to ingest and |
| mutate data in a streaming fashion. |
| |
| As an exercise to learn the Kudu programmatic APIs, try implementing a program |
| that uses the link:http://www.nextbus.com/xmlFeedDocs/NextBusXMLFeed.pdf[SFMTA XML data feed] |
| to ingest this same dataset in real time into the Kudu table. |
| |
| == Help |
| |
| If have questions, issues, or feedback on this quickstart guide, please reach out to the |
| link:https://kudu.apache.org/community.html[Apache Kudu community]. |