blob: 907a4d56d507174366c9aac8d4062a9f0d0541d8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kudu.spark.kudu
import java.net.InetAddress
import scala.collection.JavaConverters._
import scala.util.Try
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SaveMode
import org.apache.yetus.audience.InterfaceAudience
import org.apache.yetus.audience.InterfaceStability
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.apache.kudu.client.KuduPredicate.ComparisonOp
import org.apache.kudu.client._
import org.apache.kudu.spark.kudu.KuduReadOptions._
import org.apache.kudu.spark.kudu.KuduWriteOptions._
import org.apache.kudu.spark.kudu.SparkUtil._
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.streaming.OutputMode
/**
* Data source for integration with Spark's [[DataFrame]] API.
*
* Serves as a factory for [[KuduRelation]] instances for Spark. Spark will
* automatically look for a [[RelationProvider]] implementation named
* `DefaultSource` when the user specifies the path of a source during DDL
* operations through [[org.apache.spark.sql.DataFrameReader.format]].
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
class DefaultSource
extends DataSourceRegister with RelationProvider with CreatableRelationProvider
with SchemaRelationProvider with StreamSinkProvider {
val TABLE_KEY = "kudu.table"
val KUDU_MASTER = "kudu.master"
val OPERATION = "kudu.operation"
val FAULT_TOLERANT_SCANNER = "kudu.faultTolerantScan"
val SCAN_LOCALITY = "kudu.scanLocality"
val IGNORE_NULL = "kudu.ignoreNull"
val IGNORE_DUPLICATE_ROW_ERRORS = "kudu.ignoreDuplicateRowErrors"
val REPARTITION = "kudu.repartition"
val REPARTITION_SORT = "kudu.repartition.sort"
val SCAN_REQUEST_TIMEOUT_MS = "kudu.scanRequestTimeoutMs"
val SOCKET_READ_TIMEOUT_MS = "kudu.socketReadTimeoutMs"
val BATCH_SIZE = "kudu.batchSize"
val KEEP_ALIVE_PERIOD_MS = "kudu.keepAlivePeriodMs"
val SPLIT_SIZE_BYTES = "kudu.splitSizeBytes"
/**
* A nice alias for the data source so that when specifying the format
* "kudu" can be used in place of "org.apache.kudu.spark.kudu".
* Note: This class is discovered by Spark via the entry in
* `META-INF/services/org.apache.spark.sql.sources.DataSourceRegister`
*/
override def shortName(): String = "kudu"
/**
* Construct a BaseRelation using the provided context and parameters.
*
* @param sqlContext SparkSQL context
* @param parameters parameters given to us from SparkSQL
* @return a BaseRelation Object
*/
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
createRelation(sqlContext, parameters, null)
}
/**
* Construct a BaseRelation using the provided context, parameters and schema.
*
* @param sqlContext SparkSQL context
* @param parameters parameters given to us from SparkSQL
* @param schema the schema used to select columns for the relation
* @return a BaseRelation Object
*/
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String],
schema: StructType): BaseRelation = {
val tableName = getTableName(parameters)
val kuduMaster = getMasterAddrs(parameters)
val operationType = getOperationType(parameters)
val schemaOption = Option(schema)
val readOptions = getReadOptions(parameters)
val writeOptions = getWriteOptions(parameters)
new KuduRelation(
tableName,
kuduMaster,
operationType,
schemaOption,
readOptions,
writeOptions
)(sqlContext)
}
/**
* Creates a relation and inserts data to specified table.
*
* @param sqlContext
* @param mode Only Append mode is supported. It will upsert or insert data
* to an existing table, depending on the upsert parameter
* @param parameters Necessary parameters for kudu.table, kudu.master, etc...
* @param data Dataframe to save into kudu
* @return returns populated base relation
*/
override def createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
val kuduRelation = createRelation(sqlContext, parameters)
mode match {
case SaveMode.Append =>
kuduRelation.asInstanceOf[KuduRelation].insert(data, false)
case _ =>
throw new UnsupportedOperationException("Currently, only Append is supported")
}
kuduRelation
}
override def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
val tableName = getTableName(parameters)
val masterAddrs = getMasterAddrs(parameters)
val operationType = getOperationType(parameters)
val readOptions = getReadOptions(parameters)
val writeOptions = getWriteOptions(parameters)
new KuduSink(
tableName,
masterAddrs,
operationType,
readOptions,
writeOptions
)(sqlContext)
}
private def getTableName(parameters: Map[String, String]): String = {
parameters.getOrElse(
TABLE_KEY,
throw new IllegalArgumentException(
s"Kudu table name must be specified in create options using key '$TABLE_KEY'"))
}
private def getReadOptions(parameters: Map[String, String]): KuduReadOptions = {
val batchSize = parameters.get(BATCH_SIZE).map(_.toInt).getOrElse(defaultBatchSize)
val faultTolerantScanner =
parameters.get(FAULT_TOLERANT_SCANNER).map(_.toBoolean).getOrElse(defaultFaultTolerantScanner)
val scanLocality =
parameters.get(SCAN_LOCALITY).map(getScanLocalityType).getOrElse(defaultScanLocality)
val scanRequestTimeoutMs = parameters.get(SCAN_REQUEST_TIMEOUT_MS).map(_.toLong)
val keepAlivePeriodMs =
parameters.get(KEEP_ALIVE_PERIOD_MS).map(_.toLong).getOrElse(defaultKeepAlivePeriodMs)
val splitSizeBytes = parameters.get(SPLIT_SIZE_BYTES).map(_.toLong)
KuduReadOptions(
batchSize,
scanLocality,
faultTolerantScanner,
keepAlivePeriodMs,
scanRequestTimeoutMs,
/* socketReadTimeoutMs= */ None,
splitSizeBytes)
}
private def getWriteOptions(parameters: Map[String, String]): KuduWriteOptions = {
val ignoreDuplicateRowErrors =
Try(parameters(IGNORE_DUPLICATE_ROW_ERRORS).toBoolean).getOrElse(false) ||
Try(parameters(OPERATION) == "insert-ignore").getOrElse(false)
val ignoreNull =
parameters.get(IGNORE_NULL).map(_.toBoolean).getOrElse(defaultIgnoreNull)
val repartition =
parameters.get(REPARTITION).map(_.toBoolean).getOrElse(defaultRepartition)
val repartitionSort =
parameters.get(REPARTITION_SORT).map(_.toBoolean).getOrElse(defaultRepartitionSort)
KuduWriteOptions(ignoreDuplicateRowErrors, ignoreNull, repartition, repartitionSort)
}
private def getMasterAddrs(parameters: Map[String, String]): String = {
parameters.getOrElse(KUDU_MASTER, InetAddress.getLocalHost.getCanonicalHostName)
}
private def getScanLocalityType(opParam: String): ReplicaSelection = {
opParam.toLowerCase match {
case "leader_only" => ReplicaSelection.LEADER_ONLY
case "closest_replica" => ReplicaSelection.CLOSEST_REPLICA
case _ =>
throw new IllegalArgumentException(s"Unsupported replica selection type '$opParam'")
}
}
private def getOperationType(parameters: Map[String, String]): OperationType = {
parameters.get(OPERATION).map(stringToOperationType).getOrElse(Upsert)
}
private def stringToOperationType(opParam: String): OperationType = {
opParam.toLowerCase match {
case "insert" => Insert
case "insert-ignore" => Insert
case "upsert" => Upsert
case "update" => Update
case "delete" => Delete
case _ =>
throw new IllegalArgumentException(s"Unsupported operation type '$opParam'")
}
}
}
/**
* Implementation of Spark BaseRelation.
*
* @param tableName Kudu table that we plan to read from
* @param masterAddrs Kudu master addresses
* @param operationType The default operation type to perform when writing to the relation
* @param userSchema A schema used to select columns for the relation
* @param readOptions Kudu read options
* @param writeOptions Kudu write options
* @param sqlContext SparkSQL context
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
class KuduRelation(
val tableName: String,
val masterAddrs: String,
val operationType: OperationType,
val userSchema: Option[StructType],
val readOptions: KuduReadOptions = new KuduReadOptions,
val writeOptions: KuduWriteOptions = new KuduWriteOptions)(val sqlContext: SQLContext)
extends BaseRelation with PrunedFilteredScan with InsertableRelation {
val log: Logger = LoggerFactory.getLogger(getClass)
private val context: KuduContext =
new KuduContext(masterAddrs, sqlContext.sparkContext)
private val table: KuduTable = context.syncClient.openTable(tableName)
private val estimatedSize: Long = {
try {
table.getTableStatistics().getOnDiskSize
} catch {
case e: Exception =>
log.warn(
"Error while getting table statistic from master, maybe the current" +
" master doesn't support the rpc, please check the version.",
e)
super.sizeInBytes
}
}
/**
* Estimated size of this relation in bytes, this information is used by spark to
* decide whether it is safe to broadcast a relation such as in join selection. It
* is always better to overestimate this size than underestimate, because underestimation
* may lead to expensive execution plan such as broadcasting a very large table which
* will cause great network bandwidth consumption.
* TODO(KUDU-2933): Consider projection and predicates in size estimation.
*
* @return size of this relation in bytes
*/
override def sizeInBytes: Long = estimatedSize
override def unhandledFilters(filters: Array[Filter]): Array[Filter] =
filters.filterNot(KuduRelation.supportsFilter)
/**
* Generates a SparkSQL schema object so SparkSQL knows what is being
* provided by this BaseRelation.
*
* @return schema generated from the Kudu table's schema
*/
override def schema: StructType = {
sparkSchema(table.getSchema, userSchema.map(_.fieldNames))
}
/**
* Build the RDD to scan rows.
*
* @param requiredColumns columns that are being requested by the requesting query
* @param filters filters that are being applied by the requesting query
* @return RDD will all the results from Kudu
*/
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
val predicates = filters.flatMap(filterToPredicate)
new KuduRDD(
context,
table,
requiredColumns,
predicates,
readOptions,
sqlContext.sparkContext
)
}
/**
* Converts a Spark [[Filter]] to a Kudu [[KuduPredicate]].
*
* @param filter the filter to convert
* @return the converted filter
*/
private def filterToPredicate(filter: Filter): Array[KuduPredicate] = {
filter match {
case EqualTo(column, value) =>
Array(comparisonPredicate(column, ComparisonOp.EQUAL, value))
case GreaterThan(column, value) =>
Array(comparisonPredicate(column, ComparisonOp.GREATER, value))
case GreaterThanOrEqual(column, value) =>
Array(comparisonPredicate(column, ComparisonOp.GREATER_EQUAL, value))
case LessThan(column, value) =>
Array(comparisonPredicate(column, ComparisonOp.LESS, value))
case LessThanOrEqual(column, value) =>
Array(comparisonPredicate(column, ComparisonOp.LESS_EQUAL, value))
case In(column, values) =>
Array(inListPredicate(column, values))
case StringStartsWith(column, prefix) =>
prefixInfimum(prefix) match {
case None =>
Array(comparisonPredicate(column, ComparisonOp.GREATER_EQUAL, prefix))
case Some(inf) =>
Array(
comparisonPredicate(column, ComparisonOp.GREATER_EQUAL, prefix),
comparisonPredicate(column, ComparisonOp.LESS, inf))
}
case IsNull(column) => Array(isNullPredicate(column))
case IsNotNull(column) => Array(isNotNullPredicate(column))
case And(left, right) =>
filterToPredicate(left) ++ filterToPredicate(right)
case _ => Array()
}
}
/**
* Returns the smallest string s such that, if p is a prefix of t,
* then t < s, if one exists.
*
* @param p the prefix
* @return Some(the prefix infimum), or None if none exists.
*/
private def prefixInfimum(p: String): Option[String] = {
p.reverse.dropWhile(_ == Char.MaxValue).reverse match {
case "" => None
case q => Some(q.slice(0, q.length - 1) + (q(q.length - 1) + 1).toChar)
}
}
/**
* Creates a new comparison predicate for the column, comparison operator, and comparison value.
*
* @param column the column name
* @param operator the comparison operator
* @param value the comparison value
* @return the comparison predicate
*/
private def comparisonPredicate(
column: String,
operator: ComparisonOp,
value: Any): KuduPredicate = {
KuduPredicate.newComparisonPredicate(table.getSchema.getColumn(column), operator, value)
}
/**
* Creates a new in list predicate for the column and values.
*
* @param column the column name
* @param values the values
* @return the in list predicate
*/
private def inListPredicate(column: String, values: Array[Any]): KuduPredicate = {
KuduPredicate.newInListPredicate(table.getSchema.getColumn(column), values.toList.asJava)
}
/**
* Creates a new `IS NULL` predicate for the column.
*
* @param column the column name
* @return the `IS NULL` predicate
*/
private def isNullPredicate(column: String): KuduPredicate = {
KuduPredicate.newIsNullPredicate(table.getSchema.getColumn(column))
}
/**
* Creates a new `IS NOT NULL` predicate for the column.
*
* @param column the column name
* @return the `IS NOT NULL` predicate
*/
private def isNotNullPredicate(column: String): KuduPredicate = {
KuduPredicate.newIsNotNullPredicate(table.getSchema.getColumn(column))
}
/**
* Writes data into an existing Kudu table.
*
* If the `kudu.operation` parameter is set, the data will use that operation
* type. If the parameter is unset, the data will be upserted.
*
* @param data [[DataFrame]] to be inserted into Kudu
* @param overwrite must be false; otherwise, throws [[UnsupportedOperationException]]
*/
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
if (overwrite) {
throw new UnsupportedOperationException("overwrite is not yet supported")
}
context.writeRows(data, tableName, operationType, writeOptions)
}
/**
* Returns the string representation of this KuduRelation
* @return Kudu + tableName of the relation
*/
override def toString(): String = {
"Kudu " + this.tableName
}
}
private[spark] object KuduRelation {
/**
* Returns `true` if the filter is able to be pushed down to Kudu.
*
* @param filter the filter to test
*/
// formatter: off
private def supportsFilter(filter: Filter): Boolean = filter match {
case EqualTo(_, _) | GreaterThan(_, _) | GreaterThanOrEqual(_, _) | LessThan(_, _) |
LessThanOrEqual(_, _) | In(_, _) | StringStartsWith(_, _) | IsNull(_) | IsNotNull(_) =>
true
case And(left, right) => supportsFilter(left) && supportsFilter(right)
case _ => false
}
// formatter: on
}
/**
* Sinks provide at-least-once semantics by retrying failed batches,
* and provide a `batchId` interface to implement exactly-once-semantics.
* Since Kudu does not internally track batch IDs, this is ignored,
* and it is up to the user to specify an appropriate `operationType` to achieve
* the desired semantics when adding batches.
*
* The default `Upsert` allows for KuduSink to handle duplicate data and such retries.
*
* Insert ignore support (KUDU-1563) would be useful, but while that doesn't exist,
* using Upsert will work. Delete ignore would also be useful.
*/
class KuduSink(
val tableName: String,
val masterAddrs: String,
val operationType: OperationType,
val readOptions: KuduReadOptions = new KuduReadOptions,
val writeOptions: KuduWriteOptions)(val sqlContext: SQLContext)
extends Sink {
private val context: KuduContext =
new KuduContext(masterAddrs, sqlContext.sparkContext)
override def addBatch(batchId: Long, data: DataFrame): Unit = {
context.writeRows(data, tableName, operationType, writeOptions)
}
}