* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.kudu.spark.kudu
import scala.collection.JavaConverters._
import scala.util.Try
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SaveMode
import org.apache.yetus.audience.InterfaceAudience
import org.apache.yetus.audience.InterfaceStability
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.apache.kudu.client.KuduPredicate.ComparisonOp
import org.apache.kudu.client._
import org.apache.kudu.spark.kudu.KuduReadOptions._
import org.apache.kudu.spark.kudu.KuduWriteOptions._
import org.apache.kudu.spark.kudu.SparkUtil._
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.streaming.OutputMode
* Data source for integration with Spark's [[DataFrame]] API.
* Serves as a factory for [[KuduRelation]] instances for Spark. Spark will
* automatically look for a [[RelationProvider]] implementation named
* `DefaultSource` when the user specifies the path of a source during DDL
* operations through [[org.apache.spark.sql.DataFrameReader.format]].
class DefaultSource
extends DataSourceRegister with RelationProvider with CreatableRelationProvider
with SchemaRelationProvider with StreamSinkProvider {
val TABLE_KEY = "kudu.table"
val KUDU_MASTER = "kudu.master"
val OPERATION = "kudu.operation"
val FAULT_TOLERANT_SCANNER = "kudu.faultTolerantScan"
val SCAN_LOCALITY = "kudu.scanLocality"
val IGNORE_NULL = "kudu.ignoreNull"
val IGNORE_DUPLICATE_ROW_ERRORS = "kudu.ignoreDuplicateRowErrors"
val REPARTITION = "kudu.repartition"
val REPARTITION_SORT = "kudu.repartition.sort"
val SCAN_REQUEST_TIMEOUT_MS = "kudu.scanRequestTimeoutMs"
val SOCKET_READ_TIMEOUT_MS = "kudu.socketReadTimeoutMs"
val BATCH_SIZE = "kudu.batchSize"
val KEEP_ALIVE_PERIOD_MS = "kudu.keepAlivePeriodMs"
val SPLIT_SIZE_BYTES = "kudu.splitSizeBytes"
* A nice alias for the data source so that when specifying the format
* "kudu" can be used in place of "org.apache.kudu.spark.kudu".
* Note: This class is discovered by Spark via the entry in
* `META-INF/services/org.apache.spark.sql.sources.DataSourceRegister`
override def shortName(): String = "kudu"
* Construct a BaseRelation using the provided context and parameters.
* @param sqlContext SparkSQL context
* @param parameters parameters given to us from SparkSQL
* @return a BaseRelation Object
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
createRelation(sqlContext, parameters, null)
* Construct a BaseRelation using the provided context, parameters and schema.
* @param sqlContext SparkSQL context
* @param parameters parameters given to us from SparkSQL
* @param schema the schema used to select columns for the relation
* @return a BaseRelation Object
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String],
schema: StructType): BaseRelation = {
val tableName = getTableName(parameters)
val kuduMaster = getMasterAddrs(parameters)
val operationType = getOperationType(parameters)
val schemaOption = Option(schema)
val readOptions = getReadOptions(parameters)
val writeOptions = getWriteOptions(parameters)
new KuduRelation(
* Creates a relation and inserts data to specified table.
* @param sqlContext
* @param mode Only Append mode is supported. It will upsert or insert data
* to an existing table, depending on the upsert parameter
* @param parameters Necessary parameters for kudu.table, kudu.master, etc...
* @param data Dataframe to save into kudu
* @return returns populated base relation
override def createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
val kuduRelation = createRelation(sqlContext, parameters)
mode match {
case SaveMode.Append =>
kuduRelation.asInstanceOf[KuduRelation].insert(data, false)
case _ =>
throw new UnsupportedOperationException("Currently, only Append is supported")
override def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
val tableName = getTableName(parameters)
val masterAddrs = getMasterAddrs(parameters)
val operationType = getOperationType(parameters)
val readOptions = getReadOptions(parameters)
val writeOptions = getWriteOptions(parameters)
new KuduSink(
private def getTableName(parameters: Map[String, String]): String = {
throw new IllegalArgumentException(
s"Kudu table name must be specified in create options using key '$TABLE_KEY'"))
private def getReadOptions(parameters: Map[String, String]): KuduReadOptions = {
val batchSize = parameters.get(BATCH_SIZE).map(_.toInt).getOrElse(defaultBatchSize)
val faultTolerantScanner =
val scanLocality =
val scanRequestTimeoutMs = parameters.get(SCAN_REQUEST_TIMEOUT_MS).map(_.toLong)
val keepAlivePeriodMs =
val splitSizeBytes = parameters.get(SPLIT_SIZE_BYTES).map(_.toLong)
/* socketReadTimeoutMs= */ None,
private def getWriteOptions(parameters: Map[String, String]): KuduWriteOptions = {
val ignoreDuplicateRowErrors =
Try(parameters(IGNORE_DUPLICATE_ROW_ERRORS).toBoolean).getOrElse(false) ||
Try(parameters(OPERATION) == "insert-ignore").getOrElse(false)
val ignoreNull =
val repartition =
val repartitionSort =
KuduWriteOptions(ignoreDuplicateRowErrors, ignoreNull, repartition, repartitionSort)
private def getMasterAddrs(parameters: Map[String, String]): String = {
parameters.getOrElse(KUDU_MASTER, InetAddress.getLocalHost.getCanonicalHostName)
private def getScanLocalityType(opParam: String): ReplicaSelection = {
opParam.toLowerCase match {
case "leader_only" => ReplicaSelection.LEADER_ONLY
case "closest_replica" => ReplicaSelection.CLOSEST_REPLICA
case _ =>
throw new IllegalArgumentException(s"Unsupported replica selection type '$opParam'")
private def getOperationType(parameters: Map[String, String]): OperationType = {
private def stringToOperationType(opParam: String): OperationType = {
opParam.toLowerCase match {
case "insert" => Insert
case "insert-ignore" => Insert
case "upsert" => Upsert
case "update" => Update
case "delete" => Delete
case _ =>
throw new IllegalArgumentException(s"Unsupported operation type '$opParam'")
* Implementation of Spark BaseRelation.
* @param tableName Kudu table that we plan to read from
* @param masterAddrs Kudu master addresses
* @param operationType The default operation type to perform when writing to the relation
* @param userSchema A schema used to select columns for the relation
* @param readOptions Kudu read options
* @param writeOptions Kudu write options
* @param sqlContext SparkSQL context
class KuduRelation(
val tableName: String,
val masterAddrs: String,
val operationType: OperationType,
val userSchema: Option[StructType],
val readOptions: KuduReadOptions = new KuduReadOptions,
val writeOptions: KuduWriteOptions = new KuduWriteOptions)(val sqlContext: SQLContext)
extends BaseRelation with PrunedFilteredScan with InsertableRelation {
val log: Logger = LoggerFactory.getLogger(getClass)
private val context: KuduContext =
new KuduContext(masterAddrs, sqlContext.sparkContext)
private val table: KuduTable = context.syncClient.openTable(tableName)
private val estimatedSize: Long = {
try {
} catch {
case e: Exception =>
"Error while getting table statistic from master, maybe the current" +
" master doesn't support the rpc, please check the version.",
* Estimated size of this relation in bytes, this information is used by spark to
* decide whether it is safe to broadcast a relation such as in join selection. It
* is always better to overestimate this size than underestimate, because underestimation
* may lead to expensive execution plan such as broadcasting a very large table which
* will cause great network bandwidth consumption.
* TODO(KUDU-2933): Consider projection and predicates in size estimation.
* @return size of this relation in bytes
override def sizeInBytes: Long = estimatedSize
override def unhandledFilters(filters: Array[Filter]): Array[Filter] =
* Generates a SparkSQL schema object so SparkSQL knows what is being
* provided by this BaseRelation.
* @return schema generated from the Kudu table's schema
override def schema: StructType = {
* Build the RDD to scan rows.
* @param requiredColumns columns that are being requested by the requesting query
* @param filters filters that are being applied by the requesting query
* @return RDD will all the results from Kudu
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
val predicates = filters.flatMap(filterToPredicate)
new KuduRDD(
* Converts a Spark [[Filter]] to a Kudu [[KuduPredicate]].
* @param filter the filter to convert
* @return the converted filter
private def filterToPredicate(filter: Filter): Array[KuduPredicate] = {
filter match {
case EqualTo(column, value) =>
Array(comparisonPredicate(column, ComparisonOp.EQUAL, value))
case GreaterThan(column, value) =>
Array(comparisonPredicate(column, ComparisonOp.GREATER, value))
case GreaterThanOrEqual(column, value) =>
Array(comparisonPredicate(column, ComparisonOp.GREATER_EQUAL, value))
case LessThan(column, value) =>
Array(comparisonPredicate(column, ComparisonOp.LESS, value))
case LessThanOrEqual(column, value) =>
Array(comparisonPredicate(column, ComparisonOp.LESS_EQUAL, value))
case In(column, values) =>
Array(inListPredicate(column, values))
case StringStartsWith(column, prefix) =>
prefixInfimum(prefix) match {
case None =>
Array(comparisonPredicate(column, ComparisonOp.GREATER_EQUAL, prefix))
case Some(inf) =>
comparisonPredicate(column, ComparisonOp.GREATER_EQUAL, prefix),
comparisonPredicate(column, ComparisonOp.LESS, inf))
case IsNull(column) => Array(isNullPredicate(column))
case IsNotNull(column) => Array(isNotNullPredicate(column))
case And(left, right) =>
filterToPredicate(left) ++ filterToPredicate(right)
case _ => Array()
* Returns the smallest string s such that, if p is a prefix of t,
* then t < s, if one exists.
* @param p the prefix
* @return Some(the prefix infimum), or None if none exists.
private def prefixInfimum(p: String): Option[String] = {
p.reverse.dropWhile(_ == Char.MaxValue).reverse match {
case "" => None
case q => Some(q.slice(0, q.length - 1) + (q(q.length - 1) + 1).toChar)
* Creates a new comparison predicate for the column, comparison operator, and comparison value.
* @param column the column name
* @param operator the comparison operator
* @param value the comparison value
* @return the comparison predicate
private def comparisonPredicate(
column: String,
operator: ComparisonOp,
value: Any): KuduPredicate = {
KuduPredicate.newComparisonPredicate(table.getSchema.getColumn(column), operator, value)
* Creates a new in list predicate for the column and values.
* @param column the column name
* @param values the values
* @return the in list predicate
private def inListPredicate(column: String, values: Array[Any]): KuduPredicate = {
KuduPredicate.newInListPredicate(table.getSchema.getColumn(column), values.toList.asJava)
* Creates a new `IS NULL` predicate for the column.
* @param column the column name
* @return the `IS NULL` predicate
private def isNullPredicate(column: String): KuduPredicate = {
* Creates a new `IS NOT NULL` predicate for the column.
* @param column the column name
* @return the `IS NOT NULL` predicate
private def isNotNullPredicate(column: String): KuduPredicate = {
* Writes data into an existing Kudu table.
* If the `kudu.operation` parameter is set, the data will use that operation
* type. If the parameter is unset, the data will be upserted.
* @param data [[DataFrame]] to be inserted into Kudu
* @param overwrite must be false; otherwise, throws [[UnsupportedOperationException]]
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
if (overwrite) {
throw new UnsupportedOperationException("overwrite is not yet supported")
context.writeRows(data, tableName, operationType, writeOptions)
* Returns the string representation of this KuduRelation
* @return Kudu + tableName of the relation
override def toString(): String = {
"Kudu " + this.tableName
private[spark] object KuduRelation {
* Returns `true` if the filter is able to be pushed down to Kudu.
* @param filter the filter to test
// formatter: off
private def supportsFilter(filter: Filter): Boolean = filter match {
case EqualTo(_, _) | GreaterThan(_, _) | GreaterThanOrEqual(_, _) | LessThan(_, _) |
LessThanOrEqual(_, _) | In(_, _) | StringStartsWith(_, _) | IsNull(_) | IsNotNull(_) =>
case And(left, right) => supportsFilter(left) && supportsFilter(right)
case _ => false
// formatter: on
* Sinks provide at-least-once semantics by retrying failed batches,
* and provide a `batchId` interface to implement exactly-once-semantics.
* Since Kudu does not internally track batch IDs, this is ignored,
* and it is up to the user to specify an appropriate `operationType` to achieve
* the desired semantics when adding batches.
* The default `Upsert` allows for KuduSink to handle duplicate data and such retries.
* Insert ignore support (KUDU-1563) would be useful, but while that doesn't exist,
* using Upsert will work. Delete ignore would also be useful.
class KuduSink(
val tableName: String,
val masterAddrs: String,
val operationType: OperationType,
val readOptions: KuduReadOptions = new KuduReadOptions,
val writeOptions: KuduWriteOptions)(val sqlContext: SQLContext)
extends Sink {
private val context: KuduContext =
new KuduContext(masterAddrs, sqlContext.sparkContext)
override def addBatch(batchId: Long, data: DataFrame): Unit = {
context.writeRows(data, tableName, operationType, writeOptions)