blob: 12df40c3476a0cccfcd3121964c067e752821a88 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.metrics
import java.io.{FileInputStream, InputStream}
import java.util.Properties
import scala.collection.mutable
import scala.jdk.CollectionConverters._
import scala.util.matching.Regex
import org.apache.spark.SparkConf
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.PATH
import org.apache.spark.internal.config.METRICS_CONF
import org.apache.spark.util.Utils
private[spark] class MetricsConfig(conf: SparkConf) extends Logging {
private val DEFAULT_PREFIX = "*"
private val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
private val DEFAULT_METRICS_CONF_FILENAME = "metrics.properties"
private[metrics] val properties = new Properties()
private[metrics] var perInstanceSubProperties: mutable.HashMap[String, Properties] = null
private def setDefaultProperties(prop: Properties): Unit = {
prop.setProperty("*.sink.servlet.class", "org.apache.spark.metrics.sink.MetricsServlet")
prop.setProperty("*.sink.servlet.path", "/metrics/json")
prop.setProperty("master.sink.servlet.path", "/metrics/master/json")
prop.setProperty("applications.sink.servlet.path", "/metrics/applications/json")
}
/**
* Load properties from various places, based on precedence
* If the same property is set again latter on in the method, it overwrites the previous value
*/
def initialize(): Unit = {
// Add default properties in case there's no properties file
setDefaultProperties(properties)
loadPropertiesFromFile(conf.get(METRICS_CONF))
// Also look for the properties in provided Spark configuration
val prefix = "spark.metrics.conf."
conf.getAll.foreach {
case (k, v) if k.startsWith(prefix) =>
properties.setProperty(k.substring(prefix.length()), v)
case _ =>
}
// Now, let's populate a list of sub-properties per instance, instance being the prefix that
// appears before the first dot in the property name.
// Add to the sub-properties per instance, the default properties (those with prefix "*"), if
// they don't have that exact same sub-property already defined.
//
// For example, if properties has ("*.class"->"default_class", "*.path"->"default_path,
// "driver.path"->"driver_path"), for driver specific sub-properties, we'd like the output to be
// ("driver"->Map("path"->"driver_path", "class"->"default_class")
// Note how class got added to based on the default property, but path remained the same
// since "driver.path" already existed and took precedence over "*.path"
//
perInstanceSubProperties = subProperties(properties, INSTANCE_REGEX)
if (perInstanceSubProperties.contains(DEFAULT_PREFIX)) {
val defaultSubProperties = perInstanceSubProperties(DEFAULT_PREFIX).asScala
for ((instance, prop) <- perInstanceSubProperties if (instance != DEFAULT_PREFIX);
(k, v) <- defaultSubProperties if (prop.get(k) == null)) {
prop.put(k, v)
}
}
}
/**
* Take a simple set of properties and a regex that the instance names (part before the first dot)
* have to conform to. And, return a map of the first order prefix (before the first dot) to the
* sub-properties under that prefix.
*
* For example, if the properties sent were Properties("*.sink.servlet.class"->"class1",
* "*.sink.servlet.path"->"path1"), the returned map would be
* Map("*" -> Properties("sink.servlet.class" -> "class1", "sink.servlet.path" -> "path1"))
* Note in the subProperties (value of the returned Map), only the suffixes are used as property
* keys.
* If, in the passed properties, there is only one property with a given prefix, it is still
* "unflattened". For example, if the input was Properties("*.sink.servlet.class" -> "class1"
* the returned Map would contain one key-value pair
* Map("*" -> Properties("sink.servlet.class" -> "class1"))
* Any passed in properties, not complying with the regex are ignored.
*
* @param prop the flat list of properties to "unflatten" based on prefixes
* @param regex the regex that the prefix has to comply with
* @return an unflattened map, mapping prefix with sub-properties under that prefix
*/
def subProperties(prop: Properties, regex: Regex): mutable.HashMap[String, Properties] = {
val subProperties = new mutable.HashMap[String, Properties]
prop.asScala.foreach { kv =>
if (regex.findPrefixOf(kv._1).isDefined) {
val regex(prefix, suffix) = kv._1
subProperties.getOrElseUpdate(prefix, new Properties).setProperty(suffix, kv._2)
}
}
subProperties
}
def getInstance(inst: String): Properties = {
perInstanceSubProperties.get(inst) match {
case Some(s) => s
case None => perInstanceSubProperties.getOrElse(DEFAULT_PREFIX, new Properties)
}
}
/**
* Loads configuration from a config file. If no config file is provided, try to get file
* in class path.
*/
private[this] def loadPropertiesFromFile(path: Option[String]): Unit = {
var is: InputStream = null
try {
is = path match {
case Some(f) => new FileInputStream(f)
case None => Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_METRICS_CONF_FILENAME)
}
if (is != null) {
properties.load(is)
}
} catch {
case e: Exception =>
val file = path.getOrElse(DEFAULT_METRICS_CONF_FILENAME)
logError(log"Error loading configuration file ${MDC(PATH, file)}", e)
} finally {
if (is != null) {
is.close()
}
}
}
}