blob: adc8df364049ce6ec692335a19a89dee4d83aa34 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.cluster
import java.io.File
import com.typesafe.config._
import org.apache.gearpump.util.Constants._
import org.apache.gearpump.util.{Constants, FileUtils, LogUtil, Util}
/**
*
* All Gearpump application should use this class to load configurations.
*
* Compared with Akka built-in com.typesafe.config.ConfigFactory, this class also
* resolve config from file gear.conf and geardefault.conf.
*
* Overriding order:
* {{{
* System Properties
* > Custom configuration file (by using system property -Dgearpump.config.file) >
* > gear.conf
* > geardefault.conf
* > reference.conf
* }}}
*/
object ClusterConfig {
/**
* alias for default
* default is a reserved word for java
*/
def defaultConfig: Config = {
default(APPLICATION)
}
/**
* default application for user.
* Usually used when user want to start an client application.
*/
def default(configFile: String = APPLICATION): Config = {
load(configFile).default
}
/**
* configuration for master node
*/
def master(configFile: String = null): Config = {
load(configFile).master
}
/**
* configuration for worker node
*/
def worker(configFile: String = null): Config = {
load(configFile).worker
}
/**
* configuration for UI server
*/
def ui(configFile: String = null): Config = {
load(configFile).ui
}
/**
* try to load system property gearpump.config.file, or use configFile
*/
private def load(configFile: String): Configs = {
val file = Option(System.getProperty(GEARPUMP_CUSTOM_CONFIG_FILE))
file match {
case Some(path) =>
LOG.info("loading config file " + path + "..........")
load(ClusterConfigSource(path))
case None =>
load(ClusterConfigSource(configFile))
}
}
val APPLICATION = "application.conf"
val LOG = LogUtil.getLogger(getClass)
def saveConfig(conf: Config, file: File): Unit = {
val serialized = conf.root().render()
FileUtils.write(file, serialized)
}
def render(config: Config, concise: Boolean = false): String = {
if (concise) {
config.root().render(ConfigRenderOptions.concise().setFormatted(true))
} else {
config.root().render(ConfigRenderOptions.defaults())
}
}
/** filter JVM reserved keys and akka default reference.conf */
def filterOutDefaultConfig(input: Config): Config = {
val updated = filterOutJvmReservedKeys(input)
Util.filterOutOrigin(updated, "reference.conf")
}
private[gearpump] def load(source: ClusterConfigSource): Configs = {
val systemProperties = getSystemProperties
val user = source.getConfig
val gear = ConfigFactory.parseResourcesAnySyntax("gear.conf",
ConfigParseOptions.defaults.setAllowMissing(true))
val gearDefault = ConfigFactory.parseResourcesAnySyntax("geardefault.conf",
ConfigParseOptions.defaults.setAllowMissing(true))
val all = systemProperties.withFallback(user).withFallback(gear).withFallback(gearDefault)
val linux = all.getConfig(LINUX_CONFIG)
var basic = all.withoutPath(MASTER_CONFIG).withoutPath(WORKER_CONFIG).
withoutPath(UI_CONFIG).withoutPath(LINUX_CONFIG)
if (!akka.util.Helpers.isWindows) {
// Change the akka.scheduler.tick-duration to 1 ms for Linux or Mac
basic = linux.withFallback(basic)
}
val master = replaceHost(all.getConfig(MASTER_CONFIG).withFallback(basic))
val worker = replaceHost(all.getConfig(WORKER_CONFIG).withFallback(basic))
val ui = replaceHost(all.getConfig(UI_CONFIG).withFallback(basic))
val app = replaceHost(basic)
new Configs(master, worker, ui, app)
}
private def replaceHost(config: Config): Config = {
val hostName = config.getString(Constants.GEARPUMP_HOSTNAME)
config.withValue(NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(hostName))
}
val JVM_RESERVED_PROPERTIES = List(
"os", "java", "sun", "boot", "user", "prog", "path", "line", "awt", "file"
)
private def getSystemProperties: Config = {
// Excludes default java system properties
JVM_RESERVED_PROPERTIES.foldLeft(ConfigFactory.systemProperties()) { (config, property) =>
config.withoutPath(property)
}
}
class ConfigValidationException(msg: String) extends Exception(msg: String)
private def filterOutJvmReservedKeys(input: Config): Config = {
val filterJvmReservedKeys = JVM_RESERVED_PROPERTIES.foldLeft(input) { (config, key) =>
config.withoutPath(key)
}
filterJvmReservedKeys
}
protected class Configs(
val master: Config, val worker: Config, val ui: Config, val default: Config)
}