blob: 636ab721844cd13bf4bfaf3850689e65a5cd0b00 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.spark.configuration
import org.apache.druid.java.util.common.StringUtils
import org.apache.spark.sql.sources.v2.DataSourceOptions
import scala.collection.JavaConverters.mapAsScalaMapConverter
/**
* A simple wrapper around a properties map that can also "dive" into a sub-Configuration if keys are stored using
* dotted separators. Because this class is designed to make working with Spark DataSourceOptions easier and DSOs are
* case-insensitive, this class is also case-insensitive.
*
* @param properties A Map of String property names to String property values to back this Configuration.
*/
class Configuration(properties: Map[String, String]) extends Serializable {
def getAs[T](key: String): T = {
properties(StringUtils.toLowerCase(key)).asInstanceOf[T]
}
def get(key: String): Option[String] = {
properties.get(StringUtils.toLowerCase(key))
}
def get(keyWithDefault: (String, String)): String = {
this.get(StringUtils.toLowerCase(keyWithDefault._1)).getOrElse(keyWithDefault._2)
}
def getString(key: String): String = {
properties.getOrElse(StringUtils.toLowerCase(key), "")
}
def getInt(key: String, default: Int): Int = {
properties.get(StringUtils.toLowerCase(key)).fold(default)(_.toInt)
}
def getInt(keyWithDefault: (String, Int)): Int = {
this.getInt(StringUtils.toLowerCase(keyWithDefault._1), keyWithDefault._2)
}
def getLong(key: String, default: Long): Long = {
properties.get(StringUtils.toLowerCase(key)).fold(default)(_.toLong)
}
def getLong(keyWithDefault: (String, Long)): Long = {
this.getLong(StringUtils.toLowerCase(keyWithDefault._1), keyWithDefault._2)
}
def getBoolean(key: String, default: Boolean): Boolean = {
properties.get(StringUtils.toLowerCase(key)).fold(default)(_.toBoolean)
}
def getBoolean(keyWithDefault: (String, Boolean)): Boolean = {
this.getBoolean(StringUtils.toLowerCase(keyWithDefault._1), keyWithDefault._2)
}
def apply(key: String): String = {
this.get(key) match {
case Some(v) => v
case None => throw new NoSuchElementException(s"Key $key not found!")
}
}
def isPresent(key: String): Boolean = {
properties.isDefinedAt(StringUtils.toLowerCase(key))
}
def isPresent(paths: String*): Boolean = {
properties.isDefinedAt(StringUtils.toLowerCase(paths.mkString(Configuration.SEPARATOR)))
}
/**
* Given a prefix PREFIX, return a Configuration object containing every key in this
* configuration that starts with PREFIX. Keys in the resulting Configuration will have PREFIX
* removed from their start.
*
* For example, if a Configuration contains the key `druid.broker.host` and `dive("druid")` is
* called on it, the resulting Configuration will contain the key `broker.host` with the same
* value as `druid.broker.host` had in the original Configuration.
*
* @param prefix The namespace to "dive" into.
* @return A Configuration containing the keys in this Configuration that start with PREFIX,
* stripped of their leading PREFIX.
*/
def dive(prefix: String): Configuration = {
new Configuration(properties
.filterKeys(_.startsWith(s"$prefix${Configuration.SEPARATOR}"))
.map{case (k, v) => k.substring(prefix.length + 1) -> v})
}
/**
* Given a number of prefixes, return a Configuration object containing every key in this
* configuration that starts with `prefixes.mkString(Configuration.SEPARATOR)`. Keys in the
* resulting Configuration will have the concatenation of PREFIXES removed from their start.
*
* Note that this is the equivalent of chaining `dive` calls, not chaining `merge` calls.
*
* @param prefixes The namespaces, in order, to "dive" into.
* @return A Configuration containing the keys that start with every prefix in PREFIXES joined
* by periods, stripped of the leading prefixes matching the prefixes in PREFIXES.
*/
def dive(prefixes: String*): Configuration = {
prefixes.foldLeft(this){case (conf, prefix) => conf.dive(prefix)}
}
/**
* Combine this configuration with another Configuration. If keys collide between these
* configurations, the corresponding values in OTHER will be selected.
*
* @param other A Configuration to merge with this Configuration.
* @return A Configuration containing the union of keys between this Configuration and OTHER.
* If keys collide between the two Configurations, the values in OTHER will be kept.
*/
def merge(other: Configuration): Configuration = {
new Configuration(this.properties ++ other.toMap)
}
/**
* Combine this configuration with another Configuration, moving the other Configuration to the provided name space.
* If keys collide between this configuration and the newly-namespaced OTHER, the corresponding values in OTHER will
* be selected.
*
* @param namespace The name space to merge OTHER under.
* @param other The Configuration to merge with this Configuration.
* @return A new Configuration object containing all the keys in this Configuration, plus the keys in OTHER
* namespaced under NAMESPACE.
*/
def merge(namespace: String, other: Configuration): Configuration = {
this.merge(Configuration(namespace, other.toMap))
}
/**
* Add the properties specified in PROPERTIES to this Configuration's properties, moving the new properties to the
* provided name space. If this Configuration already contains keys under the provided name space and those keys
* collide with the properties specified in PROPERTIES, the corresponding values in PROPERTIES will be selected.
*
* @param namespace The name space to merge the properties specified in PROPERTIES under.
* @param properties The map of properties to values to combine with the properties from this Configuration.
* @return A new Configuration object containing all the keys in this Configuration, plus the properties in
* PROPERTIES namespaced under NAMESPACE.
*/
def merge(namespace: String, properties: Map[String, String]): Configuration = {
this.merge(Configuration(namespace, properties))
}
override def equals(obj: Any): Boolean = {
obj.isInstanceOf[Configuration] && this.toMap == obj.asInstanceOf[Configuration].toMap
}
override def hashCode(): Int = {
this.properties.hashCode()
}
def toMap: Map[String, String] = {
this.properties
}
override def toString: String = {
this.toMap.mkString("Configuration{", "; ", "}")
}
}
object Configuration {
def apply(properties: Map[String, String]): Configuration = {
new Configuration(properties.map{case (k, v) => StringUtils.toLowerCase(k) -> v})
}
def apply(namespace: String, properties: Map[String, String]): Configuration = {
new Configuration(properties.map{case(k, v) => StringUtils.toLowerCase(s"$namespace$SEPARATOR$k") -> v})
}
def apply(dso: DataSourceOptions): Configuration = {
new Configuration(dso.asMap().asScala.toMap)
}
def fromKeyValue(key: String, value: String): Configuration = {
new Configuration(Map[String, String](StringUtils.toLowerCase(key) -> value))
}
/**
* Get the key corresponding to each element of PATHS interpreted as a namespace or property.
*
* @param paths The parent namespaces and property as individual strings to convert into a single configuration key.
* @return The path to a property through its parent namespaces as a single configuration key.
*/
def toKey(paths: String*): String = {
StringUtils.toLowerCase(paths.mkString(Configuration.SEPARATOR))
}
val EMPTY_CONF: Configuration = Configuration(Map[String, String]())
private val SEPARATOR = "."
}