blob: f4df32e2ad932f551e1b38bd88b9942cd2194572 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.spark.clients
import com.fasterxml.jackson.core.`type`.TypeReference
import com.google.common.base.Suppliers
import org.apache.druid.indexer.SQLMetadataStorageUpdaterJobHandler
import org.apache.druid.java.util.common.{DateTimes, Intervals, JodaUtils, StringUtils}
import org.apache.druid.metadata.{DynamicConfigProvider, MetadataStorageConnectorConfig,
MetadataStorageTablesConfig, SQLMetadataConnector}
import org.apache.druid.spark.MAPPER
import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys}
import org.apache.druid.spark.mixins.Logging
import org.apache.druid.spark.registries.SQLConnectorRegistry
import org.apache.druid.timeline.{DataSegment, Partitions, VersionedIntervalTimeline}
import org.skife.jdbi.v2.{DBI, Handle}
import java.util.Properties
import scala.collection.JavaConverters.{asJavaIterableConverter, asScalaBufferConverter,
asScalaSetConverter, mapAsJavaMapConverter}
class DruidMetadataClient(
metadataDbType: String,
host: String,
port: Int,
connectUri: String,
user: String,
passwordProviderSer: String,
dbcpMap: Properties,
base: String = "druid"
) extends Logging {
private lazy val druidMetadataTableConfig = MetadataStorageTablesConfig.fromBase(base)
private lazy val dbcpProperties = new Properties(dbcpMap)
private lazy val password = if (passwordProviderSer == "") {
// Jackson doesn't like deserializing empty strings
passwordProviderSer
} else {
MAPPER.readValue[DynamicConfigProvider[String]](
passwordProviderSer, new TypeReference[DynamicConfigProvider[String]] {}
).getConfig.getOrDefault("password", "")
}
private lazy val connectorConfig: MetadataStorageConnectorConfig =
new MetadataStorageConnectorConfig
{
override def isCreateTables: Boolean = false
override def getHost: String = host
override def getPort: Int = port
override def getConnectURI: String = connectUri
override def getUser: String = user
override def getPassword: String = password
override def getDbcpProperties: Properties = dbcpProperties
}
private lazy val connectorConfigSupplier = Suppliers.ofInstance(connectorConfig)
private lazy val metadataTableConfigSupplier = Suppliers.ofInstance(druidMetadataTableConfig)
private lazy val connector = buildSQLConnector()
/**
* Get the non-overshadowed used segments for DATASOURCE between INTERVALSTART and INTERVALEND. If either interval
* endpoint is None, JodaUtils.MIN_INSTANCE/MAX_INSTANCE is used instead. By default, only segments for complete
* partitions are returned. This behavior can be overriden by setting ALLOWINCOMPLETEPARTITIONS, in which case all
* non-overshadowed segments in the interval will be returned, regardless of completesness.
*
* @param datasource The Druid data source to get segment payloads for.
* @param intervalStart The start of the interval to fetch segment payloads for. If None, MIN_INSTANT is used.
* @param intervalEnd The end of the interval to fetch segment payloads for. If None, MAX_INSTANT is used.
* @param allowIncompletePartitions Whether or not to include segments from incomplete partitions
* @return A Sequence of DataSegments representing all non-overshadowed used segments for the given data source and
* interval.
*/
def getSegmentPayloads(
datasource: String,
intervalStart: Option[Long],
intervalEnd: Option[Long],
allowIncompletePartitions: Boolean = false
): Seq[DataSegment] = {
val dbi: DBI = connector.getDBI
val interval = Intervals.utc(
intervalStart.getOrElse(JodaUtils.MIN_INSTANT), intervalEnd.getOrElse(JodaUtils.MAX_INSTANT)
)
logInfo(s"Fetching segment payloads for interval [${interval.toString}] on datasource [$datasource].")
val startClause = if (intervalStart.isDefined) " AND start >= :start" else ""
val endClause = if (intervalEnd.isDefined) " AND \"end\" <= :end" else ""
val allSegments = dbi.withHandle((handle: Handle) => {
val statement =
s"""
|SELECT payload FROM ${druidMetadataTableConfig.getSegmentsTable}
|WHERE datasource = :datasource AND used = true$startClause$endClause
""".stripMargin
val bindMap = Seq(Some("datasource" -> datasource),
intervalStart.map(bound => "start" -> DateTimes.utc(bound).toString("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")),
intervalEnd.map(bound => "end" -> DateTimes.utc(bound).toString("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))
).flatten.toMap
val query = handle.createQuery(statement)
val result = query
.bindFromMap(bindMap.asJava)
.mapTo(classOf[Array[Byte]]).list().asScala
result.map(blob =>
MAPPER.readValue[DataSegment](
StringUtils.fromUtf8(blob), new TypeReference[DataSegment] {}
)
)
})
val activeSegments = VersionedIntervalTimeline.forSegments(allSegments.asJava).findNonOvershadowedObjectsInInterval(
interval,
if (allowIncompletePartitions) Partitions.INCOMPLETE_OK else Partitions.ONLY_COMPLETE
).asScala.toSeq
logInfo(s"Fetched payloads for ${activeSegments.size} segments for interval [${interval.toString}] on " +
s"datasource [$datasource].")
activeSegments
}
def publishSegments(segments: java.util.List[DataSegment]): Unit = {
val metadataStorageUpdaterJobHandler = new SQLMetadataStorageUpdaterJobHandler(connector)
metadataStorageUpdaterJobHandler.publishSegments(druidMetadataTableConfig.getSegmentsTable,
segments, MAPPER)
logInfo(s"Published ${segments.size()} segments.")
}
def checkIfDataSourceExists(dataSource: String): Boolean = {
val dbi: DBI = connector.getDBI
dbi.withHandle((handle: Handle) => {
val statement =
s"""
|SELECT DISTINCT dataSource FROM ${druidMetadataTableConfig.getSegmentsTable}
|WHERE used = true AND dataSource = :dataSource
""".stripMargin
!handle.createQuery(statement).bind("dataSource", dataSource).list().isEmpty
})
}
/**
* This won't run in a Druid cluster, so users will need to respecify metadata connection info.
* This also means users will need to specifically include the extension jars on their clusters.
*
* @return
*/
private def buildSQLConnector(): SQLMetadataConnector = {
SQLConnectorRegistry.create(metadataDbType, connectorConfigSupplier, metadataTableConfigSupplier)
}
}
object DruidMetadataClient {
def apply(conf: Configuration): DruidMetadataClient = {
val metadataConf = conf.dive(DruidConfigurationKeys.metadataPrefix)
require(metadataConf.isPresent(DruidConfigurationKeys.metadataDbTypeKey),
s"Must set ${DruidConfigurationKeys.metadataPrefix}." +
s"${DruidConfigurationKeys.metadataDbTypeKey} or provide segments directly!")
val dbcpProperties = if (metadataConf.isPresent(DruidConfigurationKeys.metadataDbcpPropertiesKey)) {
MAPPER.readValue[Properties](metadataConf.getString(DruidConfigurationKeys.metadataDbcpPropertiesKey),
new TypeReference[Properties] {})
} else {
new Properties()
}
new DruidMetadataClient(
metadataConf.getAs[String](DruidConfigurationKeys.metadataDbTypeKey),
metadataConf.get(DruidConfigurationKeys.metadataHostDefaultKey),
metadataConf.getInt(DruidConfigurationKeys.metadataPortDefaultKey),
metadataConf.getString(DruidConfigurationKeys.metadataConnectUriKey),
metadataConf.getString(DruidConfigurationKeys.metadataUserKey),
metadataConf.getString(DruidConfigurationKeys.metadataPasswordKey),
dbcpProperties,
metadataConf.get(DruidConfigurationKeys.metadataBaseNameDefaultKey)
)
}
}