blob: c31925391ed2e25ae2b61df806553970f7408fc6 [file] [log] [blame]
/*
* Copyright (c) 2019 The StreamX Project
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.streamxhub.streamx.common.util
import org.yaml.snakeyaml.Yaml
import java.io._
import java.util.concurrent.atomic.AtomicInteger
import java.util.{Properties, Scanner, HashMap => JavaMap, LinkedHashMap => JavaLinkedMap}
import scala.collection.JavaConversions._
import scala.collection.mutable.{Map => MutableMap}
/**
* @author benjobs
*/
object PropertiesUtils extends Logger {
def readFile(filename: String): String = {
val file = new File(filename)
require(file.exists(), s"[StreamX] readFile: file $file does not exist")
require(file.isFile, s"[StreamX] readFile: file $file is not a normal file")
val scanner = new Scanner(file)
val buffer = new StringBuilder
while (scanner.hasNextLine) {
buffer.append(scanner.nextLine()).append("\r\n")
}
scanner.close()
buffer.toString()
}
private[this] def eachAppendYamlItem(prefix: String, k: String, v: Any, proper: collection.mutable.Map[String, String]): Map[String, String] = {
v match {
case map: JavaLinkedMap[String, Any] =>
map.flatMap(x => {
prefix match {
case "" => eachAppendYamlItem(k, x._1, x._2, proper)
case other => eachAppendYamlItem(s"$other.$k", x._1, x._2, proper)
}
}).toMap
case text =>
val value = text match {
case null => ""
case other => other.toString
}
prefix match {
case "" => proper += k -> value
case other => proper += s"$other.$k" -> value
}
proper.toMap
}
}
def fromYamlText(text: String): Map[String, String] = {
try {
val map = MutableMap[String, String]()
new Yaml()
.load(text)
.asInstanceOf[java.util.Map[String, Map[String, Any]]]
.flatMap(x => eachAppendYamlItem("", x._1, x._2, map)).toMap
} catch {
case e: IOException => throw new IllegalArgumentException(s"Failed when loading conf error:", e)
}
}
def fromPropertiesText(conf: String): Map[String, String] = {
try {
val properties = new Properties()
properties.load(new StringReader(conf))
properties.stringPropertyNames().map(k => (k, properties.getProperty(k).trim)).toMap
} catch {
case e: IOException => throw new IllegalArgumentException(s"Failed when loading properties ", e)
}
}
/** Load Yaml present in the given file. */
def fromYamlFile(filename: String): Map[String, String] = {
val file = new File(filename)
require(file.exists(), s"[StreamX] fromYamlFile: Yaml file $file does not exist")
require(file.isFile, s"[StreamX] fromYamlFile: Yaml file $file is not a normal file")
val inputStream: InputStream = new FileInputStream(file)
try {
val map = MutableMap[String, String]()
new Yaml()
.load(inputStream)
.asInstanceOf[java.util.Map[String, Map[String, Any]]]
.flatMap(x => eachAppendYamlItem("", x._1, x._2, map)).toMap
} catch {
case e: IOException => throw new IllegalArgumentException(s"Failed when loading properties from $filename", e)
} finally {
inputStream.close()
}
}
/** Load properties present in the given file. */
def fromPropertiesFile(filename: String): Map[String, String] = {
val file = new File(filename)
require(file.exists(), s"[StreamX] fromPropertiesFile: Properties file $file does not exist")
require(file.isFile, s"[StreamX] fromPropertiesFile: Properties file $file is not a normal file")
val inReader = new InputStreamReader(new FileInputStream(file), "UTF-8")
try {
val properties = new Properties()
properties.load(inReader)
properties.stringPropertyNames().map(k => (k, properties.getProperty(k).trim)).toMap
} catch {
case e: IOException => throw new IllegalArgumentException(s"Failed when loading properties from $filename", e)
} finally {
inReader.close()
}
}
/** Load Yaml present in the given file. */
def fromYamlFile(inputStream: InputStream): Map[String, String] = {
require(inputStream != null, s"[StreamX] fromYamlFile: Properties inputStream must not be null")
try {
val map = MutableMap[String, String]()
new Yaml()
.load(inputStream)
.asInstanceOf[java.util.Map[String, Map[String, Any]]]
.flatMap(x => eachAppendYamlItem("", x._1, x._2, map)).toMap
} catch {
case e: IOException => throw new IllegalArgumentException(s"Failed when loading yaml from inputStream", e)
} finally {
inputStream.close()
}
}
/** Load properties present in the given file. */
def fromPropertiesFile(inputStream: InputStream): Map[String, String] = {
require(inputStream != null, s"[StreamX] fromPropertiesFile: Properties inputStream must not be null")
try {
val properties = new Properties()
properties.load(inputStream)
properties.stringPropertyNames().map(k => (k, properties.getProperty(k).trim)).toMap
} catch {
case e: IOException => throw new IllegalArgumentException(s"[StreamX] Failed when loading properties from inputStream", e)
}
}
/**
*
* @param file
* @return
*/
def loadFlinkConfYaml(file: File): JavaMap[String, String] = {
require(file != null && file.exists() && file.isFile, "[StreamX] loadFlinkConfYaml: file must not be null")
loadFlinkConfYaml(org.apache.commons.io.FileUtils.readFileToString(file))
}
def loadFlinkConfYaml(yaml: String): JavaMap[String, String] = {
require(yaml != null && yaml.nonEmpty, "[StreamX] loadFlinkConfYaml: yaml must not be null")
val flinkConf = new JavaMap[String, String]()
val scanner: Scanner = new Scanner(yaml)
val lineNo: AtomicInteger = new AtomicInteger(0)
while (scanner.hasNextLine) {
val line = scanner.nextLine()
lineNo.incrementAndGet()
// 1. check for comments
val comments = line.split("#", 2)
val conf = comments(0).trim
// 2. get key and value
if (conf.nonEmpty) {
val kv = conf.split(": ", 2)
// skip line with no valid key-value pair
val key = kv(0).trim
val value = kv(1).trim
// sanity check
if (key.nonEmpty && value.nonEmpty) {
flinkConf += key -> value
} else {
logWarn(s"Error after splitting key and value in configuration ${lineNo.get()}: $line")
}
}
}
flinkConf
}
}