blob: 40b661cf6e11f3199a33b191414541f6b64c1574 [file] [log] [blame]
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
package config
import (
"flag"
"fmt"
"os"
"strings"
"time"
"github.com/CeresDB/ceresmeta/pkg/log"
"github.com/caarlos0/env/v6"
"github.com/pelletier/go-toml/v2"
"github.com/pkg/errors"
"go.etcd.io/etcd/server/v3/embed"
"go.uber.org/zap"
)
const (
defaultGrpcHandleTimeoutMs int64 = 60 * 1000
defaultInitialLimiterCapacity int = 100 * 1000
defaultInitialLimiterRate int = 10 * 1000
defaultEnableLimiter bool = false
defaultEtcdStartTimeoutMs int64 = 60 * 1000
defaultCallTimeoutMs = 5 * 1000
defaultMaxTxnOps = 128
defaultEtcdLeaseTTLSec = 10
defaultNodeNamePrefix = "ceresmeta"
defaultRootPath = "/ceresdb"
defaultClientUrls = "http://0.0.0.0:2379"
defaultPeerUrls = "http://0.0.0.0:2380"
defaultInitialClusterState = embed.ClusterStateFlagNew
defaultInitialClusterToken = "ceresmeta-cluster" //#nosec G101
defaultCompactionMode = "periodic"
defaultAutoCompactionRetention = "1h"
defaultTickIntervalMs int64 = 500
defaultElectionTimeoutMs = 3000
defaultQuotaBackendBytes = 8 * 1024 * 1024 * 1024 // 8GB
defaultMaxRequestBytes uint = 2 * 1024 * 1024 // 2MB
defaultMaxScanLimit int = 100
defaultMinScanLimit int = 20
defaultIDAllocatorStep uint = 20
defaultClusterName = "defaultCluster"
defaultClusterNodeCount = 2
defaultClusterReplicationFactor = 1
defaultClusterShardTotal = 8
enableSchedule = true
// topologyType is used to determine the scheduling cluster strategy of CeresMeta. It should be determined according to the storage method of CeresDB. The default is static to support local storage.
defaultTopologyType = "static"
defaultHTTPPort = 8080
defaultDataDir = "/tmp/ceresmeta"
defaultEtcdDataDir = "/etcd"
defaultWalDir = "/wal"
defaultEtcdLogFile = "/etcd.log"
defaultLogFile = "/ceresmeta.log"
)
type LimiterConfig struct {
// TokenBucketFillRate is the rate at which the limiter tokens are updated.
TokenBucketFillRate int `toml:"token-bucket-fill-rate" env:"FLOW_LIMITER_TOKEN_BUCKET_FILL_RATE"`
// TokenBucketBurstEventCapacity is the Capacity of the limiter token bucket.
TokenBucketBurstEventCapacity int `toml:"token-bucket-burst-event-capacity" env:"FLOW_LIMITER_TOKEN_BUCKET_BURST_EVENT_CAPACITY"`
// Enable is used to control the switch of the limiter.
Enable bool `toml:"enable" env:"FLOW_LIMITER_ENABLE"`
}
// Config is server start config, it has three input modes:
// 1. toml config file
// 2. env variables
// Their loading has priority, and low priority configurations will be overwritten by high priority configurations.
// The priority from high to low is: env variables > toml config file.
type Config struct {
Log log.Config `toml:"log" env:"LOG"`
EtcdLog log.Config `toml:"etcd-log" env:"ETCD_LOG"`
FlowLimiter LimiterConfig `toml:"flow-limiter" env:"FLOW_LIMITER"`
GrpcHandleTimeoutMs int64 `toml:"grpc-handle-timeout-ms" env:"GRPC_HANDLER_TIMEOUT_MS"`
EtcdStartTimeoutMs int64 `toml:"etcd-start-timeout-ms" env:"ETCD_START_TIMEOUT_MS"`
EtcdCallTimeoutMs int64 `toml:"etcd-call-timeout-ms" env:"ETCD_CALL_TIMEOUT_MS"`
EtcdMaxTxnOps int64 `toml:"etcd-max-txn-ops" env:"ETCD_MAX_TXN_OPS"`
LeaseTTLSec int64 `toml:"lease-sec" env:"LEASE_SEC"`
NodeName string `toml:"node-name" env:"NODE_NAME"`
DataDir string `toml:"data-dir" env:"DATA_DIR"`
StorageRootPath string `toml:"storage-root-path" env:"STORAGE_ROOT_PATH"`
InitialCluster string `toml:"initial-cluster" env:"INITIAL_CLUSTER"`
InitialClusterState string `toml:"initial-cluster-state" env:"INITIAL_CLUSTER_STATE"`
InitialClusterToken string `toml:"initial-cluster-token" env:"INITIAL_CLUSTER_TOKEN"`
// TickInterval is the interval for etcd Raft tick.
TickIntervalMs int64 `toml:"tick-interval-ms" env:"TICK_INTERVAL_MS"`
ElectionTimeoutMs int64 `toml:"election-timeout-ms" env:"ELECTION_TIMEOUT_MS"`
// QuotaBackendBytes Raise alarms when backend size exceeds the given quota. 0 means use the default quota.
// the default size is 2GB, the maximum is 8GB.
QuotaBackendBytes int64 `toml:"quota-backend-bytes" env:"QUOTA_BACKEND_BYTES"`
// AutoCompactionMode is either 'periodic' or 'revision'. The default value is 'periodic'.
AutoCompactionMode string `toml:"auto-compaction-mode" env:"AUTO-COMPACTION-MODE"`
// AutoCompactionRetention is either duration string with time unit
// (e.g. '5m' for 5-minute), or revision unit (e.g. '5000').
// If no time unit is provided and compaction mode is 'periodic',
// the unit defaults to hour. For example, '5' translates into 5-hour.
// The default retention is 1 hour.
// Before etcd v3.3.x, the type of retention is int. We add 'v2' suffix to make it backward compatible.
AutoCompactionRetention string `toml:"auto-compaction-retention" env:"AUTO_COMPACTION_RETENTION"`
MaxRequestBytes uint `toml:"max-request-bytes" env:"MAX_REQUEST_BYTES"`
MaxScanLimit int `toml:"max-scan-limit" env:"MAX_SCAN_LIMIT"`
MinScanLimit int `toml:"min-scan-limit" env:"MIN_SCAN_LIMIT"`
IDAllocatorStep uint `toml:"id-allocator-step" env:"ID_ALLOCATOR_STEP"`
// Following fields are the settings for the default cluster.
DefaultClusterName string `toml:"default-cluster-name" env:"DEFAULT_CLUSTER_NAME"`
DefaultClusterNodeCount int `toml:"default-cluster-node-count" env:"DEFAULT_CLUSTER_NODE_COUNT"`
DefaultClusterReplicationFactor int `toml:"default-cluster-replication-factor" env:"DEFAULT_CLUSTER_REPLICATION_FACTOR"`
DefaultClusterShardTotal int `toml:"default-cluster-shard-total" env:"DEFAULT_CLUSTER_SHARD_TOTAL"`
// When the EnableSchedule is turned on, the failover scheduling will be turned on, which is used for CeresDB cluster publishing and using local storage.
EnableSchedule bool `toml:"enable-schedule" env:"ENABLE_SCHEDULE"`
// TopologyType indicates the schedule type used by the CeresDB cluster, it will determine the strategy of CeresMeta scheduling cluster.
TopologyType string `toml:"topology-type" env:"TOPOLOGY_TYPE"`
ClientUrls string `toml:"client-urls" env:"CLIENT_URLS"`
PeerUrls string `toml:"peer-urls" env:"PEER_URLS"`
AdvertiseClientUrls string `toml:"advertise-client-urls" env:"ADVERTISE_CLIENT_URLS"`
AdvertisePeerUrls string `toml:"advertise-peer-urls" env:"ADVERTISE_PEER_URLS"`
HTTPPort int `toml:"default-http-port" env:"DEFAULT_HTTP_PORT"`
}
func (c *Config) GrpcHandleTimeout() time.Duration {
return time.Duration(c.GrpcHandleTimeoutMs) * time.Millisecond
}
func (c *Config) EtcdStartTimeout() time.Duration {
return time.Duration(c.EtcdStartTimeoutMs) * time.Millisecond
}
func (c *Config) EtcdCallTimeout() time.Duration {
return time.Duration(c.EtcdCallTimeoutMs) * time.Millisecond
}
// ValidateAndAdjust validates the config fields and adjusts some fields which should be adjusted.
// Return error if any field is invalid.
func (c *Config) ValidateAndAdjust() error {
return nil
}
func (c *Config) GenEtcdConfig() (*embed.Config, error) {
cfg := embed.NewConfig()
cfg.Name = c.NodeName
cfg.Dir = strings.Join([]string{c.DataDir, defaultEtcdDataDir}, "")
cfg.WalDir = strings.Join([]string{c.DataDir, defaultWalDir}, "")
cfg.InitialCluster = c.InitialCluster
cfg.ClusterState = c.InitialClusterState
cfg.InitialClusterToken = c.InitialClusterToken
cfg.EnablePprof = true
cfg.TickMs = uint(c.TickIntervalMs)
cfg.ElectionMs = uint(c.ElectionTimeoutMs)
cfg.AutoCompactionMode = c.AutoCompactionMode
cfg.AutoCompactionRetention = c.AutoCompactionRetention
cfg.QuotaBackendBytes = c.QuotaBackendBytes
cfg.MaxRequestBytes = c.MaxRequestBytes
cfg.MaxTxnOps = uint(c.EtcdMaxTxnOps)
var err error
cfg.LPUrls, err = parseUrls(c.PeerUrls)
if err != nil {
return nil, err
}
cfg.APUrls, err = parseUrls(c.AdvertisePeerUrls)
if err != nil {
return nil, err
}
cfg.LCUrls, err = parseUrls(c.ClientUrls)
if err != nil {
return nil, err
}
cfg.ACUrls, err = parseUrls(c.AdvertiseClientUrls)
if err != nil {
return nil, err
}
cfg.Logger = "zap"
cfg.LogOutputs = []string{strings.Join([]string{c.DataDir, defaultEtcdLogFile}, "")}
cfg.LogLevel = c.EtcdLog.Level
return cfg, nil
}
func (c *Config) GenLogConfigConfig() log.Config {
return log.Config{
Level: c.Log.Level,
File: defaultLogFile,
}
}
// Parser builds the config from the flags.
type Parser struct {
flagSet *flag.FlagSet
cfg *Config
configFilePath string
}
func (p *Parser) Parse(arguments []string) (*Config, error) {
if err := p.flagSet.Parse(arguments); err != nil {
if err == flag.ErrHelp {
return nil, ErrHelpRequested.WithCause(err)
}
return nil, ErrInvalidCommandArgs.WithCausef("fail to parse flag arguments:%v, err:%v", arguments, err)
}
return p.cfg, nil
}
func makeDefaultNodeName() (string, error) {
host, err := os.Hostname()
if err != nil {
return "", ErrRetrieveHostname.WithCause(err)
}
return fmt.Sprintf("%s-%s", defaultNodeNamePrefix, host), nil
}
func makeDefaultInitialCluster(nodeName string) string {
return fmt.Sprintf("%s=%s", nodeName, defaultPeerUrls)
}
func MakeConfigParser() (*Parser, error) {
defaultNodeName, err := makeDefaultNodeName()
if err != nil {
return nil, err
}
defaultInitialCluster := makeDefaultInitialCluster(defaultNodeName)
fs, cfg := flag.NewFlagSet("meta", flag.ContinueOnError), &Config{
Log: log.Config{
Level: log.DefaultLogLevel,
File: log.DefaultLogFile,
},
EtcdLog: log.Config{
Level: log.DefaultLogLevel,
File: log.DefaultLogFile,
},
FlowLimiter: LimiterConfig{
TokenBucketFillRate: defaultInitialLimiterRate,
TokenBucketBurstEventCapacity: defaultInitialLimiterCapacity,
Enable: defaultEnableLimiter,
},
GrpcHandleTimeoutMs: defaultGrpcHandleTimeoutMs,
EtcdStartTimeoutMs: defaultEtcdStartTimeoutMs,
EtcdCallTimeoutMs: defaultCallTimeoutMs,
EtcdMaxTxnOps: defaultMaxTxnOps,
LeaseTTLSec: defaultEtcdLeaseTTLSec,
NodeName: defaultNodeName,
DataDir: defaultDataDir,
StorageRootPath: defaultRootPath,
InitialCluster: defaultInitialCluster,
InitialClusterState: defaultInitialClusterState,
InitialClusterToken: defaultInitialClusterToken,
ClientUrls: defaultClientUrls,
AdvertiseClientUrls: defaultClientUrls,
PeerUrls: defaultPeerUrls,
AdvertisePeerUrls: defaultPeerUrls,
TickIntervalMs: defaultTickIntervalMs,
ElectionTimeoutMs: defaultElectionTimeoutMs,
QuotaBackendBytes: defaultQuotaBackendBytes,
AutoCompactionMode: defaultCompactionMode,
AutoCompactionRetention: defaultAutoCompactionRetention,
MaxRequestBytes: defaultMaxRequestBytes,
MaxScanLimit: defaultMaxScanLimit,
MinScanLimit: defaultMinScanLimit,
IDAllocatorStep: defaultIDAllocatorStep,
DefaultClusterName: defaultClusterName,
DefaultClusterNodeCount: defaultClusterNodeCount,
DefaultClusterReplicationFactor: defaultClusterReplicationFactor,
DefaultClusterShardTotal: defaultClusterShardTotal,
EnableSchedule: enableSchedule,
TopologyType: defaultTopologyType,
HTTPPort: defaultHTTPPort,
}
builder := &Parser{
flagSet: fs,
cfg: cfg,
}
fs.StringVar(&builder.configFilePath, "config", "", "config file path")
return builder, nil
}
// ParseConfigFromToml read configuration from the toml file, if the config item already exists, it will be overwritten.
func (p *Parser) ParseConfigFromToml() error {
if len(p.configFilePath) == 0 {
log.Info("no config file specified, skip parse config")
return nil
}
log.Info("get config from toml", zap.String("configFile", p.configFilePath))
file, err := os.ReadFile(p.configFilePath)
if err != nil {
log.Error("err", zap.Error(err))
return errors.WithMessage(err, fmt.Sprintf("read config file, configFile:%s", p.configFilePath))
}
log.Info("toml config value", zap.String("config", string(file)))
err = toml.Unmarshal(file, p.cfg)
if err != nil {
log.Error("err", zap.Error(err))
return errors.WithMessagef(err, "unmarshal toml config, configFile:%s", p.configFilePath)
}
return nil
}
func (p *Parser) ParseConfigFromEnv() error {
err := env.Parse(p.cfg)
if err != nil {
return errors.WithMessagef(err, "parse config from env variables")
}
return nil
}