blob: badfba5f5ed942402fcc8b4046fc02899062dc07 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.spark.v2.reader
import com.fasterxml.jackson.core.`type`.TypeReference
import org.apache.druid.java.util.common.{Intervals, JodaUtils}
import org.apache.druid.spark.MAPPER
import org.apache.druid.spark.clients.{DruidClient, DruidMetadataClient}
import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys}
import org.apache.druid.spark.mixins.Logging
import org.apache.druid.spark.utils.{FilterUtils, SchemaUtils}
import org.apache.druid.timeline.DataSegment
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, InputPartition,
SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsScanColumnarBatch}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.joda.time.Interval
import java.util.{List => JList}
import scala.collection.JavaConverters.{asScalaBufferConverter, seqAsJavaListConverter}
/**
* A DruidDataSourceReader handles the actual work of reading data from Druid. It does this by querying to determine
* where Druid segments live in deep storage and then reading those segments into memory in order to avoid straining
* the Druid cluster. In general, users should not directly instantiate instances of this class but instead use
* sparkSession.read.format("druid").options(Map(...)).load(). If the schema of the data in Druid is known, overhead
* can be further reduced by providing it directly (e.g. sparkSession.read.format("druid").schema(schema).options...)
*
* To aid comprehensibility, some idiomatic Scala has been somewhat java-fied.
*/
class DruidDataSourceReader(
var schema: Option[StructType] = None,
conf: Configuration
) extends DataSourceReader
with SupportsPushDownRequiredColumns with SupportsPushDownFilters with SupportsScanColumnarBatch with Logging {
private lazy val metadataClient =
DruidDataSourceReader.createDruidMetaDataClient(conf)
private lazy val druidClient = DruidDataSourceReader.createDruidClient(conf)
private var filters: Array[Filter] = Array.empty
private var druidColumnTypes: Option[Set[String]] = Option.empty
override def readSchema(): StructType = {
if (schema.isDefined) {
schema.get
} else {
require(conf.isPresent(DruidConfigurationKeys.tableKey),
s"Must set ${DruidConfigurationKeys.tableKey}!")
// TODO: Optionally accept a granularity so that if lowerBound to upperBound spans more than
// twice the granularity duration, we can send a list with two disjoint intervals and
// minimize the load on the broker from having to merge large numbers of segments
val (lowerBound, upperBound) = FilterUtils.getTimeFilterBounds(filters)
val columnMap = druidClient.getSchema(
conf.getString(DruidConfigurationKeys.tableKey),
Some(List[Interval](Intervals.utc(
lowerBound.getOrElse(JodaUtils.MIN_INSTANT),
upperBound.getOrElse(JodaUtils.MAX_INSTANT)
)))
)
schema = Option(SchemaUtils.convertDruidSchemaToSparkSchema(columnMap))
druidColumnTypes = Option(columnMap.map(_._2._1).toSet)
schema.get
}
}
override def planInputPartitions(): JList[InputPartition[InternalRow]] = {
// For now, one partition for each Druid segment partition
// Future improvements can use information from SegmentAnalyzer results to do smart things
if (schema.isEmpty) {
readSchema()
}
val readerConf = conf.dive(DruidConfigurationKeys.readerPrefix)
val filter = FilterUtils.mapFilters(filters, schema.get).map(_.optimize())
val useSparkConfForDeepStorage = readerConf.getBoolean(DruidConfigurationKeys.useSparkConfForDeepStorageDefaultKey)
val useCompactSketches = readerConf.isPresent(DruidConfigurationKeys.useCompactSketchesKey)
val useDefaultNullHandling = readerConf.getBoolean(DruidConfigurationKeys.useDefaultValueForNullDefaultKey)
// Allow passing hard-coded list of segments to load
if (readerConf.isPresent(DruidConfigurationKeys.segmentsKey)) {
val segments: JList[DataSegment] = MAPPER.readValue(
readerConf.getString(DruidConfigurationKeys.segmentsKey),
new TypeReference[JList[DataSegment]]() {}
)
segments.asScala
.map(segment =>
new DruidInputPartition(
segment,
schema.get,
filter,
druidColumnTypes,
conf,
useSparkConfForDeepStorage,
useCompactSketches,
useDefaultNullHandling
): InputPartition[InternalRow]
).asJava
} else {
getSegments
.map(segment =>
new DruidInputPartition(
segment,
schema.get,
filter,
druidColumnTypes,
conf,
useSparkConfForDeepStorage,
useCompactSketches,
useDefaultNullHandling
): InputPartition[InternalRow]
).asJava
}
}
override def pruneColumns(structType: StructType): Unit = {
schema = Option(structType)
}
/**
* Given an array of Spark filters FILTERS, adds the filters this reader can push down to this.filters and returns
* an array containing the filters in FILTERS this reader cannot support.
*
* @param filters An array of filters to evaluate for predicate pushdown support.
* @return The filters in FILTERS this reader does not support.
*/
override def pushFilters(filters: Array[Filter]): Array[Filter] = {
readSchema()
val useSqlCompatibleNullHandling = !conf.getBoolean(DruidConfigurationKeys.useDefaultValueForNullDefaultKey)
filters.partition(FilterUtils.isSupportedFilter(_, schema.get, useSqlCompatibleNullHandling)) match {
case (supported, unsupported) =>
this.filters = supported
unsupported
}
}
override def pushedFilters(): Array[Filter] = filters
private[reader] def getSegments: Seq[DataSegment] = {
require(conf.isPresent(DruidConfigurationKeys.tableKey),
s"Must set ${DruidConfigurationKeys.tableKey}!")
// Check filters for any bounds on __time
// Otherwise, we'd need to full scan the segments table
val (lowerTimeBound, upperTimeBound) = FilterUtils.getTimeFilterBounds(filters)
metadataClient.getSegmentPayloads(
conf.getString(DruidConfigurationKeys.tableKey),
lowerTimeBound,
upperTimeBound,
conf.getBoolean(DruidConfigurationKeys.allowIncompletePartitionsDefaultKey)
)
}
override def planBatchInputPartitions(): JList[InputPartition[ColumnarBatch]] = {
if (schema.isEmpty) {
readSchema()
}
val readerConf = conf.dive(DruidConfigurationKeys.readerPrefix)
val filter = FilterUtils.mapFilters(filters, schema.get).map(_.optimize())
val useSparkConfForDeepStorage = readerConf.getBoolean(DruidConfigurationKeys.useSparkConfForDeepStorageDefaultKey)
val useCompactSketches = readerConf.isPresent(DruidConfigurationKeys.useCompactSketchesKey)
val useDefaultNullHandling = readerConf.getBoolean(DruidConfigurationKeys.useDefaultValueForNullDefaultKey)
val batchSize = readerConf.getInt(DruidConfigurationKeys.batchSizeDefaultKey)
// Allow passing hard-coded list of segments to load
if (readerConf.isPresent(DruidConfigurationKeys.segmentsKey)) {
val segments: JList[DataSegment] = MAPPER.readValue(
readerConf.getString(DruidConfigurationKeys.segmentsKey),
new TypeReference[JList[DataSegment]]() {}
)
segments.asScala
.map(segment =>
new DruidColumnarInputPartition(
segment,
schema.get,
filter,
druidColumnTypes,
conf,
useSparkConfForDeepStorage,
useCompactSketches,
useDefaultNullHandling,
batchSize
): InputPartition[ColumnarBatch]
).asJava
} else {
getSegments
.map(segment =>
new DruidColumnarInputPartition(
segment,
schema.get,
filter,
druidColumnTypes,
conf,
useSparkConfForDeepStorage,
useCompactSketches,
useDefaultNullHandling,
batchSize
): InputPartition[ColumnarBatch]
).asJava
}
}
override def enableBatchRead(): Boolean = {
// Fail fast
if (!conf.dive(DruidConfigurationKeys.readerPrefix).getBoolean(DruidConfigurationKeys.vectorizeDefaultKey)) {
false
} else {
if (schema.isEmpty) {
readSchema()
}
val filterOpt = FilterUtils.mapFilters(filters, schema.get)
filterOpt.fold(true) { filter =>
val rowSignature = SchemaUtils.generateRowSignatureFromSparkSchema(schema.get)
val canVectorize = filter.toOptimizedFilter.canVectorizeMatcher(rowSignature)
if (!canVectorize) {
logWarn("Vectorization enabled in config but pushed-down filters are not vectorizable! Reading rows.")
}
canVectorize
}
}
}
}
object DruidDataSourceReader {
def apply(schema: StructType, dataSourceOptions: DataSourceOptions): DruidDataSourceReader = {
new DruidDataSourceReader(Option(schema), Configuration(dataSourceOptions))
}
def apply(dataSourceOptions: DataSourceOptions): DruidDataSourceReader = {
new DruidDataSourceReader(None, Configuration(dataSourceOptions))
}
/* Unfortunately, there's no single method of interacting with a Druid cluster that provides all
* three operations we need: get segment locations, get dataSource schemata, and publish segments.
*
* Segment locations can be determined either via direct interaction with the metadata server or
* via the coordinator API, but not via querying the `sys.segments` table served from a cluster
* since the `sys.segments` table prunes load specs.
*
* Data source schemata can be determined via querying the `INFORMATION_SCHEMA.COLUMNS` table, via
* SegmentMetadataQueries, or via pulling segments into memory and analyzing them.
* SegmentMetadataQueries can be expensive and time-consuming for large numbers of segments. This
* could be worked around by only checking the first and last segments for an interval, which
* would catch schema evolution that spans the interval to query, but not schema evolution within
* the interval and would prevent determining accurate statistics. Likewise, pulling segments into
* memory on the driver to check their schema is expensive and inefficient and has the same schema
* evolution and accurate statistics problem.
* The `INFORMATION_SCHEMA.COLUMNS` table does not contain information about whether or not a column
* could contain multiple values and does not know the actual metric type for complex types. Less
* relevantly, we wouldn't have access to possibly useful statistics about the segments
* that could be used to perform more efficient reading, and the Druid cluster to read from would
* need to have sql querying initialized and be running a version of Druid >= 0.14. Since we're
* not currently doing any intelligent partitioning for reads, this doesn't really matter.
*
* Publishing segments can only be done via direct interaction with the metadata server.
*
* Since there's no way to satisfy these constraints with a single method of interaction, we will
* need to use a metadata client and a druid client. The metadata client can fetch segment
* locations and publish segments, and the druid client will issue SegmentMetadata queries to determine
* datasource schemata.
*/
def createDruidMetaDataClient(conf: Configuration): DruidMetadataClient = {
DruidMetadataClient(conf)
}
def createDruidClient(conf: Configuration): DruidClient = {
DruidClient(conf)
}
}