blob: 9a3074010c2d8341e302084dda3e0a85897bf118 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
package org.apache.kudu.spark.examples
import collection.JavaConverters._
import org.slf4j.LoggerFactory
import org.apache.kudu.client._
import org.apache.kudu.spark.kudu._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.Row
object SparkExample {
// The list of RPC endpoints of Kudu masters in the cluster,
// separated by comma. The default value assumes the following:
// * the cluster runs a single Kudu master
// * the Kudu master runs at the default RPC port
// * the spark-submit is run at the Kudu master's node
val kuduMasters: String = System.getProperty("kuduMasters", "localhost:7051")
// The name of a table to create.
val tableName: String = System.getProperty("tableName", "kudu_spark_example")
// The replication factor of the table to create. Make sure at least this
// number of tablet servers are available when running this example app.
// Replication factors of 1, 3, 5, 7 are available out-of-the-box.
// The default value of 1 is chosen to provide maximum environment
// compatibility, but it's good only for small toy examples like this.
// For real-world scenarios the replication factor of 1 (i.e. keeping table's
// data non-replicated) is a bad idea in general: consider the replication
// factor of 3 and higher.
val tableNumReplicas: Int = Integer.getInteger("tableNumReplicas", 1)
val nameCol = "name"
val idCol = "id"
val logger = LoggerFactory.getLogger(SparkExample.getClass)
// Define a class that we'll use to insert data into the table.
// Because we're defining a case class here, we circumvent the need to
// explicitly define a schema later in the code, like during our RDD -> toDF()
// calls later on.
case class User(name:String, id:Int)
def main(args: Array[String]) {
// Define our session and context variables for use throughout the program.
// The kuduContext is a serializable container for Kudu client connections,
// while the SparkSession is the entry point to SparkSQL and
// the Dataset/DataFrame API.
val spark = SparkSession.builder.appName("KuduSparkExample").getOrCreate()
val kuduContext = new KuduContext(kuduMasters, spark.sqlContext.sparkContext)
// Import a class from the SparkSession we instantiated above, to allow
// for easier RDD -> DF conversions.
import spark.implicits._
// The schema of the table we're going to create.
val schema = StructType(
List(
StructField(idCol, IntegerType, false),
StructField(nameCol, StringType, false)
)
)
var tableIsCreated = false
try {
// Make sure the table does not exist. This is mostly to demonstrate
// the capabilities of the API. In general, there might be a racing
// request to create the table coming from elsewhere, so even
// if tableExists() returned false at this time, the table might appear
// while createTable() is running below. In the latter case, appropriate
// Kudu exception will be thrown by createTable().
if (kuduContext.tableExists(tableName)) {
throw new RuntimeException(tableName + ": table already exists")
}
// Create the table with 3 hash partitions, resulting in 3 tablets,
// each with the specified number of replicas.
kuduContext.createTable(tableName, schema, Seq(idCol),
new CreateTableOptions()
.addHashPartitions(List(idCol).asJava, 3)
.setNumReplicas(tableNumReplicas))
tableIsCreated = true
// Write to the table.
logger.info(s"writing to table '$tableName'")
val data = Array(User("userA", 1234), User("userB", 5678))
val userRDD = spark.sparkContext.parallelize(data)
val userDF = userRDD.toDF()
kuduContext.insertRows(userDF, tableName)
// Read from the table using an RDD.
logger.info(s"reading back the rows just written")
val readCols = Seq(nameCol, idCol)
val readRDD = kuduContext.kuduRDD(spark.sparkContext, tableName, readCols)
val userTuple = readRDD.map { case Row(name: String, id: Int) => (name, id) }
userTuple.collect().foreach(println(_))
// Upsert some rows.
logger.info(s"upserting to table '$tableName'")
val upsertUsers = Array(User("newUserA", 1234), User("userC", 7777))
val upsertUsersRDD = spark.sparkContext.parallelize(upsertUsers)
val upsertUsersDF = upsertUsersRDD.toDF()
kuduContext.upsertRows(upsertUsersDF, tableName)
// Read the updated table using SparkSQL.
val sqlDF = spark.sqlContext.read.options(
Map("kudu.master" -> kuduMasters, "kudu.table" -> tableName)).kudu
sqlDF.createOrReplaceTempView(tableName)
spark.sqlContext.sql(s"SELECT * FROM $tableName WHERE $idCol > 1000").show
} catch {
// Catch, log and re-throw. Not the best practice, but this is a very
// simplistic example.
case unknown : Throwable => logger.error(s"got an exception: " + unknown)
throw unknown
} finally {
// Clean up.
if (tableIsCreated) {
logger.info(s"deleting table '$tableName'")
kuduContext.deleteTable(tableName)
}
logger.info(s"closing down the session")
spark.close()
}
}
}