blob: d1976493217b0810eee9e380e80bd8929bdf6d90 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use serde::{Deserialize, Serialize};
use std::fs;
use std::path::Path;
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct MemoryStoreConfig {
pub capacity: String,
pub buffer_ticket_timeout_sec: Option<i64>,
}
impl MemoryStoreConfig {
pub fn new(capacity: String) -> Self {
Self {
capacity,
buffer_ticket_timeout_sec: Some(5 * 60),
}
}
pub fn from(capacity: String, buffer_ticket_timeout_sec: i64) -> Self {
Self {
capacity,
buffer_ticket_timeout_sec: Some(buffer_ticket_timeout_sec),
}
}
}
// =========================================================
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
pub struct HdfsStoreConfig {
pub max_concurrency: Option<i32>,
}
// =========================================================
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct LocalfileStoreConfig {
pub data_paths: Vec<String>,
pub healthy_check_min_disks: Option<i32>,
pub disk_high_watermark: Option<f32>,
pub disk_low_watermark: Option<f32>,
pub disk_max_concurrency: Option<i32>,
}
impl LocalfileStoreConfig {
pub fn new(data_paths: Vec<String>) -> Self {
LocalfileStoreConfig {
data_paths,
healthy_check_min_disks: None,
disk_high_watermark: None,
disk_low_watermark: None,
disk_max_concurrency: None,
}
}
}
// =========================================================
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(default)]
pub struct RuntimeConfig {
pub read_thread_num: usize,
pub write_thread_num: usize,
pub grpc_thread_num: usize,
pub http_thread_num: usize,
pub default_thread_num: usize,
}
impl Default for RuntimeConfig {
fn default() -> Self {
RuntimeConfig {
read_thread_num: 10,
write_thread_num: 40,
grpc_thread_num: 100,
http_thread_num: 5,
default_thread_num: 5,
}
}
}
// =========================================================
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct HybridStoreConfig {
pub memory_spill_high_watermark: f32,
pub memory_spill_low_watermark: f32,
pub memory_single_buffer_max_spill_size: Option<String>,
pub memory_spill_to_cold_threshold_size: Option<String>,
pub memory_spill_max_concurrency: Option<i32>,
}
impl HybridStoreConfig {
pub fn new(
memory_spill_high_watermark: f32,
memory_spill_low_watermark: f32,
memory_single_buffer_max_spill_size: Option<String>,
) -> Self {
HybridStoreConfig {
memory_spill_high_watermark,
memory_spill_low_watermark,
memory_single_buffer_max_spill_size,
memory_spill_to_cold_threshold_size: None,
memory_spill_max_concurrency: None,
}
}
}
impl Default for HybridStoreConfig {
fn default() -> Self {
HybridStoreConfig {
memory_spill_high_watermark: 0.8,
memory_spill_low_watermark: 0.7,
memory_single_buffer_max_spill_size: None,
memory_spill_to_cold_threshold_size: None,
memory_spill_max_concurrency: None,
}
}
}
fn as_default_runtime_config() -> RuntimeConfig {
RuntimeConfig::default()
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
pub struct Config {
pub memory_store: Option<MemoryStoreConfig>,
pub localfile_store: Option<LocalfileStoreConfig>,
pub hybrid_store: Option<HybridStoreConfig>,
pub hdfs_store: Option<HdfsStoreConfig>,
pub store_type: Option<StorageType>,
#[serde(default = "as_default_runtime_config")]
pub runtime_config: RuntimeConfig,
pub metrics: Option<MetricsConfig>,
pub grpc_port: Option<i32>,
pub coordinator_quorum: Vec<String>,
pub tags: Option<Vec<String>>,
pub log: Option<LogConfig>,
pub app_heartbeat_timeout_min: Option<u32>,
pub huge_partition_marked_threshold: Option<String>,
pub huge_partition_memory_max_used_percent: Option<f64>,
pub http_monitor_service_port: Option<u16>,
}
// =========================================================
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct MetricsConfig {
pub push_gateway_endpoint: Option<String>,
pub push_interval_sec: Option<u32>,
}
// =========================================================
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct LogConfig {
pub path: String,
pub rotation: RotationConfig,
}
impl Default for LogConfig {
fn default() -> Self {
LogConfig {
path: "/tmp/".to_string(),
rotation: RotationConfig::Hourly,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub enum RotationConfig {
Hourly,
Daily,
Never,
}
// =========================================================
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Copy)]
#[allow(non_camel_case_types)]
pub enum StorageType {
MEMORY = 1,
LOCALFILE = 2,
MEMORY_LOCALFILE = 3,
HDFS = 4,
MEMORY_HDFS = 5,
MEMORY_LOCALFILE_HDFS = 7,
}
impl StorageType {
pub fn contains_localfile(storage_type: &StorageType) -> bool {
let val = *storage_type as u8;
val & *&StorageType::LOCALFILE as u8 != 0
}
pub fn contains_memory(storage_type: &StorageType) -> bool {
let val = *storage_type as u8;
val & *&StorageType::MEMORY as u8 != 0
}
pub fn contains_hdfs(storage_type: &StorageType) -> bool {
let val = *storage_type as u8;
val & *&StorageType::HDFS as u8 != 0
}
}
const CONFIG_FILE_PATH_KEY: &str = "WORKER_CONFIG_PATH";
impl Config {
pub fn from(cfg_path: &str) -> Self {
let path = Path::new(cfg_path);
// Read the file content as a string
let file_content = fs::read_to_string(path).expect("Failed to read file");
toml::from_str(&file_content).unwrap()
}
pub fn create_from_env() -> Config {
let path = match std::env::var(CONFIG_FILE_PATH_KEY) {
Ok(val) => val,
_ => panic!(
"config path must be set in env args. key: {}",
CONFIG_FILE_PATH_KEY
),
};
Config::from(&path)
}
}
#[cfg(test)]
mod test {
use crate::config::{Config, RuntimeConfig, StorageType};
use crate::readable_size::ReadableSize;
use std::str::FromStr;
#[test]
fn storage_type_test() {
let stype = StorageType::MEMORY_LOCALFILE;
assert_eq!(true, StorageType::contains_localfile(&stype));
let stype = StorageType::MEMORY_LOCALFILE;
assert_eq!(true, StorageType::contains_memory(&stype));
assert_eq!(false, StorageType::contains_hdfs(&stype));
let stype = StorageType::MEMORY_LOCALFILE_HDFS;
assert_eq!(true, StorageType::contains_hdfs(&stype));
}
#[test]
fn config_test() {
let toml_str = r#"
store_type = "MEMORY_LOCALFILE"
coordinator_quorum = ["xxxxxxx"]
[memory_store]
capacity = "1024M"
[localfile_store]
data_paths = ["/data1/uniffle"]
[hybrid_store]
memory_spill_high_watermark = 0.8
memory_spill_low_watermark = 0.2
memory_single_buffer_max_spill_size = "256M"
"#;
let decoded: Config = toml::from_str(toml_str).unwrap();
println!("{:#?}", decoded);
let capacity = ReadableSize::from_str(&decoded.memory_store.unwrap().capacity).unwrap();
assert_eq!(1024 * 1024 * 1024, capacity.as_bytes());
assert_eq!(
decoded.runtime_config.read_thread_num,
RuntimeConfig::default().read_thread_num
);
}
}