| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| #include "config.h" |
| |
| #include <fmt/format.h> |
| #include <rocksdb/env.h> |
| #include <spdlog/spdlog.h> |
| #include <strings.h> |
| |
| #include <cstddef> |
| #include <cstdint> |
| #include <cstring> |
| #include <fstream> |
| #include <iostream> |
| #include <iterator> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include "common/string_util.h" |
| #include "config_type.h" |
| #include "config_util.h" |
| #include "parse_util.h" |
| #include "rocksdb/compression_type.h" |
| #include "server/server.h" |
| #include "status.h" |
| #include "storage/redis_metadata.h" |
| |
| constexpr const char *kDefaultDir = "/tmp/kvrocks"; |
| constexpr const char *kDefaultBackupDir = "/tmp/kvrocks/backup"; |
| constexpr const char *kDefaultPidfile = "/tmp/kvrocks/kvrocks.pid"; |
| constexpr const char *kDefaultBindAddress = "127.0.0.1"; |
| |
| constexpr const char *errBlobDbNotEnabled = "Must set rocksdb.enable_blob_files to yes first."; |
| constexpr const char *errLevelCompactionDynamicLevelBytesNotSet = |
| "Must set rocksdb.level_compaction_dynamic_level_bytes yes first."; |
| |
| const std::vector<ConfigEnum<SupervisedMode>> supervised_modes{ |
| {"no", kSupervisedNone}, |
| {"auto", kSupervisedAutoDetect}, |
| {"upstart", kSupervisedUpStart}, |
| {"systemd", kSupervisedSystemd}, |
| }; |
| |
| const std::vector<ConfigEnum<JsonStorageFormat>> json_storage_formats{{"json", JsonStorageFormat::JSON}, |
| {"cbor", JsonStorageFormat::CBOR}}; |
| |
| const std::vector<ConfigEnum<rocksdb::CompressionType>> compression_types{[] { |
| std::vector<ConfigEnum<rocksdb::CompressionType>> res; |
| res.reserve(engine::CompressionOptions.size()); |
| for (const auto &e : engine::CompressionOptions) { |
| res.push_back({e.name, e.type}); |
| } |
| return res; |
| }()}; |
| |
| const std::vector<ConfigEnum<rocksdb::CompressionType>> wal_compression_types{[] { |
| std::vector<ConfigEnum<rocksdb::CompressionType>> res; |
| res.reserve(engine::WalCompressionOptions.size()); |
| for (const auto &e : engine::WalCompressionOptions) { |
| res.push_back({e.name, e.type}); |
| } |
| return res; |
| }()}; |
| |
| const std::vector<ConfigEnum<BlockCacheType>> cache_types{[] { |
| std::vector<ConfigEnum<BlockCacheType>> res; |
| res.reserve(engine::CacheOptions.size()); |
| for (const auto &e : engine::CacheOptions) { |
| res.push_back({e.name, e.type}); |
| } |
| return res; |
| }()}; |
| |
| const std::vector<ConfigEnum<MigrationType>> migration_types{{"redis-command", MigrationType::kRedisCommand}, |
| {"raw-key-value", MigrationType::kRawKeyValue}}; |
| |
| std::string TrimRocksDbPrefix(std::string s) { |
| constexpr std::string_view prefix = "rocksdb."; |
| if (!util::StartsWithICase(s, prefix)) return s; |
| return s.substr(prefix.size()); |
| } |
| |
| Status SetRocksdbCompression(Server *srv, const rocksdb::CompressionType compression, |
| const size_t compression_start_level) { |
| if (!srv) return Status::OK(); |
| std::string compression_option; |
| for (auto &option : engine::CompressionOptions) { |
| if (option.type == compression) { |
| compression_option = option.val; |
| break; |
| } |
| } |
| if (compression_option.empty()) { |
| return {Status::NotOK, "Invalid compression type"}; |
| } |
| |
| if (compression_start_level >= KVROCKS_MAX_LSM_LEVEL) { |
| return {Status::NotOK, "compression_start_level must be < " + std::to_string(KVROCKS_MAX_LSM_LEVEL)}; |
| } |
| std::vector<std::string> compression_per_level_builder; |
| compression_per_level_builder.reserve(KVROCKS_MAX_LSM_LEVEL); |
| |
| for (size_t i = 0; i < compression_start_level; i++) { |
| compression_per_level_builder.emplace_back("kNoCompression"); |
| } |
| for (size_t i = compression_start_level; i < KVROCKS_MAX_LSM_LEVEL; i++) { |
| compression_per_level_builder.emplace_back(compression_option); |
| } |
| const std::string compression_per_level = util::StringJoin( |
| compression_per_level_builder, [](const auto &s) -> decltype(auto) { return s; }, ":"); |
| return srv->storage->SetOptionForAllColumnFamilies("compression_per_level", compression_per_level); |
| }; |
| |
| Config::Config() { |
| deprecated_fields_ = {"rocksdb.row_cache_size"}; |
| |
| struct FieldWrapper { |
| std::string name; |
| bool readonly; |
| std::unique_ptr<ConfigField> field; |
| |
| FieldWrapper(std::string name, bool readonly, ConfigField *field) |
| : name(std::move(name)), readonly(readonly), field(field) {} |
| }; |
| |
| FieldWrapper fields[] = { |
| {"daemonize", true, new YesNoField(&daemonize, false)}, |
| {"bind", true, new StringField(&binds_str_, "")}, |
| {"port", true, new UInt32Field(&port, kDefaultPort, 1, PORT_LIMIT)}, |
| {"socket-fd", true, new IntField(&socket_fd, -1, -1, 1 << 16)}, |
| #ifdef ENABLE_OPENSSL |
| {"tls-port", true, new UInt32Field(&tls_port, 0, 0, PORT_LIMIT)}, |
| {"tls-cert-file", false, new StringField(&tls_cert_file, "")}, |
| {"tls-key-file", false, new StringField(&tls_key_file, "")}, |
| {"tls-key-file-pass", false, new StringField(&tls_key_file_pass, "")}, |
| {"tls-ca-cert-file", false, new StringField(&tls_ca_cert_file, "")}, |
| {"tls-ca-cert-dir", false, new StringField(&tls_ca_cert_dir, "")}, |
| {"tls-protocols", false, new StringField(&tls_protocols, "")}, |
| {"tls-auth-clients", false, new StringField(&tls_auth_clients, "")}, |
| {"tls-ciphers", false, new StringField(&tls_ciphers, "")}, |
| {"tls-ciphersuites", false, new StringField(&tls_ciphersuites, "")}, |
| {"tls-prefer-server-ciphers", false, new YesNoField(&tls_prefer_server_ciphers, false)}, |
| {"tls-session-caching", false, new YesNoField(&tls_session_caching, true)}, |
| {"tls-session-cache-size", false, new IntField(&tls_session_cache_size, 1024 * 20, 0, INT_MAX)}, |
| {"tls-session-cache-timeout", false, new IntField(&tls_session_cache_timeout, 300, 0, INT_MAX)}, |
| {"tls-replication", true, new YesNoField(&tls_replication, false)}, |
| #endif |
| {"workers", false, new IntField(&workers, 8, 1, 256)}, |
| {"timeout", false, new IntField(&timeout, 0, 0, INT_MAX)}, |
| {"tcp-backlog", true, new IntField(&backlog, 511, 0, INT_MAX)}, |
| {"maxclients", false, new IntField(&maxclients, 10240, 0, INT_MAX)}, |
| {"max-backup-to-keep", false, new IntField(&max_backup_to_keep, 1, 0, 1)}, |
| {"max-backup-keep-hours", false, new IntField(&max_backup_keep_hours, 0, 0, INT_MAX)}, |
| {"master-use-repl-port", false, new YesNoField(&master_use_repl_port, false)}, |
| {"requirepass", false, new StringField(&requirepass, "")}, |
| {"masterauth", false, new StringField(&masterauth, "")}, |
| {"slaveof", true, new StringField(&slaveof_, "")}, |
| {"replicaof", true, new StringField(&slaveof_, "")}, |
| {"compact-cron", false, new StringField(&compact_cron_str_, "")}, |
| {"bgsave-cron", false, new StringField(&bgsave_cron_str_, "")}, |
| {"dbsize-scan-cron", false, new StringField(&dbsize_scan_cron_str_, "")}, |
| {"replica-announce-ip", false, new StringField(&replica_announce_ip, "")}, |
| {"replica-announce-port", false, new UInt32Field(&replica_announce_port, 0, 0, PORT_LIMIT)}, |
| {"compaction-checker-range", false, new StringField(&compaction_checker_range_str_, "")}, |
| {"compaction-checker-cron", false, new StringField(&compaction_checker_cron_str_, "")}, |
| {"force-compact-file-age", false, new Int64Field(&force_compact_file_age, 2 * 24 * 3600, 60, INT64_MAX)}, |
| {"force-compact-file-min-deleted-percentage", false, |
| new IntField(&force_compact_file_min_deleted_percentage, 10, 1, 100)}, |
| {"db-name", true, new StringField(&db_name, "change.me.db")}, |
| {"dir", true, new StringField(&dir, kDefaultDir)}, |
| {"backup-dir", false, new StringField(&backup_dir, kDefaultBackupDir)}, |
| {"log-dir", true, new StringField(&log_dir, "")}, |
| {"log-level", false, new EnumField<spdlog::level::level_enum>(&log_level, log_levels, spdlog::level::info)}, |
| {"pidfile", true, new StringField(&pidfile, kDefaultPidfile)}, |
| {"max-io-mb", false, new IntField(&max_io_mb, 0, 0, INT_MAX)}, |
| {"enable-blob-cache", true, new YesNoField(&enable_blob_cache, false)}, |
| {"max-bitmap-to-string-mb", false, new IntField(&max_bitmap_to_string_mb, 16, 0, INT_MAX)}, |
| {"max-db-size", false, new IntField(&max_db_size, 0, 0, INT_MAX)}, |
| {"max-replication-mb", false, new IntField(&max_replication_mb, 0, 0, INT_MAX)}, |
| {"supervised", true, new EnumField<SupervisedMode>(&supervised_mode, supervised_modes, kSupervisedNone)}, |
| {"slave-serve-stale-data", false, new YesNoField(&slave_serve_stale_data, true)}, |
| {"slave-empty-db-before-fullsync", false, new YesNoField(&slave_empty_db_before_fullsync, false)}, |
| {"slave-priority", false, new IntField(&slave_priority, 100, 0, INT_MAX)}, |
| {"slave-read-only", false, new YesNoField(&slave_readonly, true)}, |
| {"replication-connect-timeout-ms", false, new IntField(&replication_connect_timeout_ms, 3100, 0, INT_MAX)}, |
| {"replication-recv-timeout-ms", false, new IntField(&replication_recv_timeout_ms, 3200, 0, INT_MAX)}, |
| {"use-rsid-psync", true, new YesNoField(&use_rsid_psync, false)}, |
| {"profiling-sample-ratio", false, new IntField(&profiling_sample_ratio, 0, 0, 100)}, |
| {"profiling-sample-record-max-len", false, new IntField(&profiling_sample_record_max_len, 256, 0, INT_MAX)}, |
| {"profiling-sample-record-threshold-ms", false, |
| new IntField(&profiling_sample_record_threshold_ms, 100, 0, INT_MAX)}, |
| {"slowlog-log-slower-than", false, new IntField(&slowlog_log_slower_than, 200000, -1, INT_MAX)}, |
| {"profiling-sample-commands", false, new StringField(&profiling_sample_commands_str_, "")}, |
| {"slowlog-max-len", false, new IntField(&slowlog_max_len, 128, 0, INT_MAX)}, |
| {"slowlog-dump-logfile-level", false, |
| new EnumField<spdlog::level::level_enum>(&slowlog_dump_logfile_level, slowlog_dump_logfile_levels, |
| spdlog::level::off)}, |
| {"purge-backup-on-fullsync", false, new YesNoField(&purge_backup_on_fullsync, false)}, |
| {"rename-command", true, new MultiStringField(&rename_command_, std::vector<std::string>{})}, |
| {"auto-resize-block-and-sst", false, new YesNoField(&auto_resize_block_and_sst, true)}, |
| {"fullsync-recv-file-delay", false, new IntField(&fullsync_recv_file_delay, 0, 0, INT_MAX)}, |
| {"cluster-enabled", true, new YesNoField(&cluster_enabled, false)}, |
| {"migrate-speed", false, new IntField(&migrate_speed, 4096, 0, INT_MAX)}, |
| {"migrate-pipeline-size", false, new IntField(&pipeline_size, 16, 1, INT_MAX)}, |
| {"migrate-sequence-gap", false, new IntField(&sequence_gap, 10000, 1, INT_MAX)}, |
| {"migrate-type", false, |
| new EnumField<MigrationType>(&migrate_type, migration_types, MigrationType::kRawKeyValue)}, |
| {"migrate-batch-size-kb", false, new IntField(&migrate_batch_size_kb, 16, 1, INT_MAX)}, |
| {"migrate-batch-rate-limit-mb", false, new IntField(&migrate_batch_rate_limit_mb, 16, 0, INT_MAX)}, |
| {"unixsocket", true, new StringField(&unixsocket, "")}, |
| {"unixsocketperm", true, new OctalField(&unixsocketperm, 0777, 1, INT_MAX)}, |
| {"log-retention-days", true, new IntField(&log_retention_days, -1, -1, INT_MAX)}, |
| {"persist-cluster-nodes-enabled", false, new YesNoField(&persist_cluster_nodes_enabled, true)}, |
| {"redis-cursor-compatible", false, new YesNoField(&redis_cursor_compatible, true)}, |
| {"resp3-enabled", false, new YesNoField(&resp3_enabled, true)}, |
| {"repl-namespace-enabled", false, new YesNoField(&repl_namespace_enabled, false)}, |
| {"proto-max-bulk-len", false, |
| new IntWithUnitField<uint64_t>(&proto_max_bulk_len, std::to_string(512 * MiB), 1 * MiB, UINT64_MAX)}, |
| {"json-max-nesting-depth", false, new IntField(&json_max_nesting_depth, 1024, 0, INT_MAX)}, |
| {"json-storage-format", false, |
| new EnumField<JsonStorageFormat>(&json_storage_format, json_storage_formats, JsonStorageFormat::JSON)}, |
| {"txn-context-enabled", true, new YesNoField(&txn_context_enabled, false)}, |
| {"skip-block-cache-deallocation-on-close", false, new YesNoField(&skip_block_cache_deallocation_on_close, false)}, |
| {"histogram-bucket-boundaries", true, new StringField(&histogram_bucket_boundaries_str_, "")}, |
| |
| /* rocksdb options */ |
| {"rocksdb.compression", false, |
| new EnumField<rocksdb::CompressionType>(&rocks_db.compression, compression_types, |
| rocksdb::CompressionType::kNoCompression)}, |
| {"rocksdb.compression_level", true, new IntField(&rocks_db.compression_level, 32767, INT_MIN, INT_MAX)}, |
| {"rocksdb.compression_start_level", false, |
| new IntField(&rocks_db.compression_start_level, 2, 0, KVROCKS_MAX_LSM_LEVEL - 1)}, |
| {"rocksdb.block_size", true, new IntField(&rocks_db.block_size, 16384, 0, INT_MAX)}, |
| {"rocksdb.max_open_files", false, new IntField(&rocks_db.max_open_files, 8096, -1, INT_MAX)}, |
| {"rocksdb.write_buffer_size", false, new IntField(&rocks_db.write_buffer_size, 64, 0, 4096)}, |
| {"rocksdb.max_write_buffer_number", false, new IntField(&rocks_db.max_write_buffer_number, 4, 0, 256)}, |
| {"rocksdb.min_write_buffer_number_to_merge", false, |
| new IntField(&rocks_db.min_write_buffer_number_to_merge, 1, 1, 256)}, |
| {"rocksdb.target_file_size_base", false, new IntField(&rocks_db.target_file_size_base, 128, 1, 1024)}, |
| {"rocksdb.max_background_compactions", false, new IntField(&rocks_db.max_background_compactions, 2, -1, 32)}, |
| {"rocksdb.max_background_flushes", true, new IntField(&rocks_db.max_background_flushes, 2, -1, 32)}, |
| {"rocksdb.max_subcompactions", false, new IntField(&rocks_db.max_subcompactions, 2, 0, 16)}, |
| {"rocksdb.delayed_write_rate", false, new Int64Field(&rocks_db.delayed_write_rate, 0, 0, INT64_MAX)}, |
| {"rocksdb.wal_compression", true, |
| new EnumField<rocksdb::CompressionType>(&rocks_db.wal_compression, wal_compression_types, |
| rocksdb::CompressionType::kNoCompression)}, |
| {"rocksdb.wal_ttl_seconds", true, new IntField(&rocks_db.wal_ttl_seconds, 3 * 3600, 0, INT_MAX)}, |
| {"rocksdb.wal_size_limit_mb", true, new IntField(&rocks_db.wal_size_limit_mb, 16384, 0, INT_MAX)}, |
| {"rocksdb.dump_malloc_stats", true, new YesNoField(&rocks_db.dump_malloc_stats, true)}, |
| {"rocksdb.max_total_wal_size", false, new IntField(&rocks_db.max_total_wal_size, 64 * 4 * 2, 0, INT_MAX)}, |
| {"rocksdb.disable_auto_compactions", false, new YesNoField(&rocks_db.disable_auto_compactions, false)}, |
| {"rocksdb.enable_pipelined_write", true, new YesNoField(&rocks_db.enable_pipelined_write, false)}, |
| {"rocksdb.stats_dump_period_sec", false, new IntField(&rocks_db.stats_dump_period_sec, 0, 0, INT_MAX)}, |
| {"rocksdb.cache_index_and_filter_blocks", true, new YesNoField(&rocks_db.cache_index_and_filter_blocks, true)}, |
| {"rocksdb.block_cache_size", true, new IntField(&rocks_db.block_cache_size, 0, 0, INT_MAX)}, |
| {"rocksdb.block_cache_type", true, |
| new EnumField<BlockCacheType>(&rocks_db.block_cache_type, cache_types, BlockCacheType::kCacheTypeLRU)}, |
| {"rocksdb.subkey_block_cache_size", true, new IntField(&rocks_db.subkey_block_cache_size, 2048, 0, INT_MAX)}, |
| {"rocksdb.metadata_block_cache_size", true, new IntField(&rocks_db.metadata_block_cache_size, 2048, 0, INT_MAX)}, |
| {"rocksdb.share_metadata_and_subkey_block_cache", true, |
| new YesNoField(&rocks_db.share_metadata_and_subkey_block_cache, true)}, |
| {"rocksdb.compaction_readahead_size", false, |
| new IntField(&rocks_db.compaction_readahead_size, 2 * MiB, 0, 64 * MiB)}, |
| {"rocksdb.level0_slowdown_writes_trigger", false, |
| new IntField(&rocks_db.level0_slowdown_writes_trigger, 20, 0, 1024)}, |
| {"rocksdb.level0_stop_writes_trigger", false, new IntField(&rocks_db.level0_stop_writes_trigger, 40, 1, 1024)}, |
| {"rocksdb.level0_file_num_compaction_trigger", false, |
| new IntField(&rocks_db.level0_file_num_compaction_trigger, 4, 1, 1024)}, |
| {"rocksdb.enable_blob_files", false, new YesNoField(&rocks_db.enable_blob_files, false)}, |
| {"rocksdb.min_blob_size", false, new IntField(&rocks_db.min_blob_size, 4096, 0, INT_MAX)}, |
| {"rocksdb.blob_file_size", false, new IntField(&rocks_db.blob_file_size, 268435456, 0, INT_MAX)}, |
| {"rocksdb.enable_blob_garbage_collection", false, new YesNoField(&rocks_db.enable_blob_garbage_collection, true)}, |
| {"rocksdb.blob_garbage_collection_age_cutoff", false, |
| new IntField(&rocks_db.blob_garbage_collection_age_cutoff, 25, 0, 100)}, |
| {"rocksdb.max_bytes_for_level_base", false, |
| new UInt64Field(&rocks_db.max_bytes_for_level_base, 268435456ULL, 0ULL, UINT64_MAX)}, |
| {"rocksdb.max_bytes_for_level_multiplier", false, |
| new IntField(&rocks_db.max_bytes_for_level_multiplier, 10, 1, 100)}, |
| {"rocksdb.level_compaction_dynamic_level_bytes", false, |
| new YesNoField(&rocks_db.level_compaction_dynamic_level_bytes, false)}, |
| {"rocksdb.max_background_jobs", false, new IntField(&rocks_db.max_background_jobs, 4, 0, 32)}, |
| {"rocksdb.rate_limiter_auto_tuned", true, new YesNoField(&rocks_db.rate_limiter_auto_tuned, true)}, |
| {"rocksdb.avoid_unnecessary_blocking_io", true, new YesNoField(&rocks_db.avoid_unnecessary_blocking_io, true)}, |
| {"rocksdb.partition_filters", true, new YesNoField(&rocks_db.partition_filters, true)}, |
| {"rocksdb.max_compaction_bytes", false, new Int64Field(&rocks_db.max_compaction_bytes, 0, 0, INT64_MAX)}, |
| {"rocksdb.sst_file_delete_rate_bytes_per_sec", false, |
| new Int64Field(&rocks_db.sst_file_delete_rate_bytes_per_sec, 0, 0, INT64_MAX)}, |
| |
| /* rocksdb write options */ |
| {"rocksdb.write_options.sync", true, new YesNoField(&rocks_db.write_options.sync, false)}, |
| {"rocksdb.write_options.disable_wal", true, new YesNoField(&rocks_db.write_options.disable_wal, false)}, |
| {"rocksdb.write_options.no_slowdown", true, new YesNoField(&rocks_db.write_options.no_slowdown, false)}, |
| {"rocksdb.write_options.low_pri", true, new YesNoField(&rocks_db.write_options.low_pri, false)}, |
| {"rocksdb.write_options.memtable_insert_hint_per_batch", true, |
| new YesNoField(&rocks_db.write_options.memtable_insert_hint_per_batch, false)}, |
| {"rocksdb.write_options.write_batch_max_bytes", false, |
| new IntField(&rocks_db.write_options.write_batch_max_bytes, 0, 0, INT_MAX)}, |
| |
| /* rocksdb read options */ |
| {"rocksdb.read_options.async_io", false, new YesNoField(&rocks_db.read_options.async_io, true)}, |
| }; |
| for (auto &wrapper : fields) { |
| auto &field = wrapper.field; |
| field->readonly = wrapper.readonly; |
| fields_.emplace(std::move(wrapper.name), std::move(field)); |
| } |
| initFieldValidator(); |
| initFieldCallback(); |
| } |
| |
| // The validate function would be invoked before the field was set, |
| // to make sure that new value is valid. |
| void Config::initFieldValidator() { |
| std::map<std::string, ValidateFn> validators = { |
| {"requirepass", |
| [this]([[maybe_unused]] const std::string &k, const std::string &v) -> Status { |
| if (v.empty() && !load_tokens.empty()) { |
| return {Status::NotOK, "requirepass empty not allowed while the namespace exists"}; |
| } |
| if (load_tokens.find(v) != load_tokens.end()) { |
| return {Status::NotOK, "requirepass is duplicated with namespace tokens"}; |
| } |
| return Status::OK(); |
| }}, |
| {"masterauth", |
| [this]([[maybe_unused]] const std::string &k, const std::string &v) -> Status { |
| if (load_tokens.find(v) != load_tokens.end()) { |
| return {Status::NotOK, "masterauth is duplicated with namespace tokens"}; |
| } |
| return Status::OK(); |
| }}, |
| {"compact-cron", |
| [this]([[maybe_unused]] const std::string &k, const std::string &v) -> Status { |
| std::vector<std::string> args = util::Split(v, " \t"); |
| return compact_cron.SetScheduleTime(args); |
| }}, |
| {"bgsave-cron", |
| [this]([[maybe_unused]] const std::string &k, const std::string &v) -> Status { |
| std::vector<std::string> args = util::Split(v, " \t"); |
| return bgsave_cron.SetScheduleTime(args); |
| }}, |
| {"dbsize-scan-cron", |
| [this]([[maybe_unused]] const std::string &k, const std::string &v) -> Status { |
| std::vector<std::string> args = util::Split(v, " \t"); |
| return dbsize_scan_cron.SetScheduleTime(args); |
| }}, |
| {"compaction-checker-range", |
| [this]([[maybe_unused]] const std::string &k, const std::string &v) -> Status { |
| if (!compaction_checker_cron_str_.empty()) { |
| return {Status::NotOK, "compaction-checker-range cannot be set while compaction-checker-cron is set"}; |
| } |
| if (v.empty()) { |
| compaction_checker_cron.Clear(); |
| return Status::OK(); |
| } |
| return compaction_checker_cron.SetScheduleTime({"*", v, "*", "*", "*"}); |
| }}, |
| {"compaction-checker-cron", |
| [this]([[maybe_unused]] const std::string &k, const std::string &v) -> Status { |
| std::vector<std::string> args = util::Split(v, " \t"); |
| return compaction_checker_cron.SetScheduleTime(args); |
| }}, |
| {"rename-command", |
| []([[maybe_unused]] const std::string &k, const std::string &v) -> Status { |
| std::vector<std::string> all_args = util::Split(v, "\n"); |
| for (auto &p : all_args) { |
| std::vector<std::string> args = util::Split(p, " \t"); |
| if (args.size() != 2) { |
| return {Status::NotOK, "Invalid rename-command format"}; |
| } |
| auto commands = redis::CommandTable::Get(); |
| auto cmd_iter = commands->find(util::ToLower(args[0])); |
| if (cmd_iter == commands->end()) { |
| return {Status::NotOK, "No such command in rename-command"}; |
| } |
| if (args[1] != "\"\"") { |
| auto new_command_name = util::ToLower(args[1]); |
| if (commands->find(new_command_name) != commands->end()) { |
| return {Status::NotOK, "Target command name already exists"}; |
| } |
| (*commands)[new_command_name] = cmd_iter->second; |
| } |
| commands->erase(cmd_iter); |
| } |
| return Status::OK(); |
| }}, |
| }; |
| for (const auto &iter : validators) { |
| auto field_iter = fields_.find(iter.first); |
| if (field_iter != fields_.end()) { |
| field_iter->second->validate = iter.second; |
| } |
| } |
| } |
| |
| // The callback function would be invoked after the field was set, |
| // it may change related fields or re-format the field. for example, |
| // when the 'dir' was set, the db-dir or backup-dir should be reset as well. |
| void Config::initFieldCallback() { |
| auto set_db_option_cb = [](Server *srv, const std::string &k, const std::string &v) -> Status { |
| if (!srv) return Status::OK(); // srv is nullptr when load config from file |
| return srv->storage->SetDBOption(TrimRocksDbPrefix(k), v); |
| }; |
| auto set_cf_option_cb = [](Server *srv, const std::string &k, const std::string &v) -> Status { |
| if (!srv) return Status::OK(); // srv is nullptr when load config from file |
| return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), v); |
| }; |
| |
| auto set_compression_type_cb = [](Server *srv, [[maybe_unused]] const std::string &k, |
| [[maybe_unused]] const std::string &v) -> Status { |
| if (!srv) return Status::OK(); |
| return SetRocksdbCompression(srv, srv->GetConfig()->rocks_db.compression, |
| srv->GetConfig()->rocks_db.compression_start_level); |
| }; |
| auto set_compression_start_level_cb = [](Server *srv, [[maybe_unused]] const std::string &k, |
| [[maybe_unused]] const std::string &v) -> Status { |
| if (!srv) return Status::OK(); |
| return SetRocksdbCompression(srv, srv->GetConfig()->rocks_db.compression, |
| srv->GetConfig()->rocks_db.compression_start_level); |
| }; |
| |
| #ifdef ENABLE_OPENSSL |
| auto set_tls_option = [](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) { |
| if (!srv) return Status::OK(); // srv is nullptr when load config from file |
| auto new_ctx = CreateSSLContext(srv->GetConfig()); |
| if (!new_ctx) { |
| return Status(Status::NotOK, "Failed to configure SSL context, check server log for more details"); |
| } |
| srv->ssl_ctx = std::move(new_ctx); |
| return Status::OK(); |
| }; |
| #endif |
| |
| auto replicaof_cb = [this]([[maybe_unused]] Server *srv, [[maybe_unused]] const std::string &k, |
| const std::string &v) -> Status { |
| if (v.empty()) { |
| return Status::OK(); |
| } |
| std::vector<std::string> args = util::Split(v, " \t"); |
| if (args.size() != 2) return {Status::NotOK, "wrong number of arguments"}; |
| if (args[0] != "no" && args[1] != "one") { |
| master_host = args[0]; |
| auto parse_result = ParseInt<int>(args[1], NumericRange<int>{1, PORT_LIMIT - 1}, 10); |
| if (!parse_result) { |
| return {Status::NotOK, "should be between 0 and 65535"}; |
| } |
| master_port = *parse_result; |
| } |
| return Status::OK(); |
| }; |
| |
| std::map<std::string, CallbackFn> callbacks = |
| { |
| {"workers", |
| [](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { |
| if (!srv) return Status::OK(); |
| srv->AdjustWorkerThreads(); |
| return Status::OK(); |
| }}, |
| {"dir", |
| [this]([[maybe_unused]] Server *srv, [[maybe_unused]] const std::string &k, |
| [[maybe_unused]] const std::string &v) -> Status { |
| db_dir = dir + "/db"; |
| if (log_dir.empty()) log_dir = dir + ",stdout"; |
| checkpoint_dir = dir + "/checkpoint"; |
| sync_checkpoint_dir = dir + "/sync_checkpoint"; |
| backup_sync_dir = dir + "/backup_for_sync"; |
| if (backup_dir == kDefaultBackupDir) backup_dir = dir + "/backup"; |
| if (pidfile == kDefaultPidfile) pidfile = dir + "/kvrocks.pid"; |
| return Status::OK(); |
| }}, |
| {"backup-dir", |
| [this](Server *srv, [[maybe_unused]] const std::string &k, const std::string &v) -> Status { |
| std::string previous_backup; |
| { |
| // Note: currently, backup_mu_ may block by backing up or purging, |
| // the command may wait for seconds. |
| std::lock_guard<std::mutex> lg(this->backup_mu); |
| previous_backup = std::move(backup_dir); |
| backup_dir = v; |
| } |
| if (!previous_backup.empty() && srv != nullptr && !srv->IsLoading()) { |
| // info() should be called after log is initialized and server is loaded. |
| info("change backup dir from {} to {}", previous_backup, v); |
| } |
| return Status::OK(); |
| }}, |
| {"cluster-enabled", |
| [this]([[maybe_unused]] Server *srv, [[maybe_unused]] const std::string &k, |
| [[maybe_unused]] const std::string &v) -> Status { |
| if (cluster_enabled) slot_id_encoded = true; |
| return Status::OK(); |
| }}, |
| {"bind", |
| [this]([[maybe_unused]] Server *srv, [[maybe_unused]] const std::string &k, const std::string &v) -> Status { |
| std::vector<std::string> args = util::Split(v, " \t"); |
| binds = std::move(args); |
| return Status::OK(); |
| }}, |
| {"maxclients", |
| [](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { |
| if (!srv) return Status::OK(); |
| srv->AdjustOpenFilesLimit(); |
| return Status::OK(); |
| }}, |
| {"slaveof", replicaof_cb}, |
| {"replicaof", replicaof_cb}, |
| {"profiling-sample-commands", |
| [this]([[maybe_unused]] Server *srv, [[maybe_unused]] const std::string &k, const std::string &v) -> Status { |
| std::vector<std::string> cmds = util::Split(v, ","); |
| profiling_sample_all_commands = false; |
| profiling_sample_commands.clear(); |
| for (auto const &cmd : cmds) { |
| if (cmd == "*") { |
| profiling_sample_all_commands = true; |
| profiling_sample_commands.clear(); |
| return Status::OK(); |
| } |
| if (!redis::CommandTable::IsExists(cmd)) { |
| return {Status::NotOK, cmd + " is not Kvrocks supported command"}; |
| } |
| // profiling_sample_commands use command's original name, regardless of rename-command directive |
| profiling_sample_commands.insert(cmd); |
| } |
| return Status::OK(); |
| }}, |
| {"slowlog-max-len", |
| [this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { |
| if (!srv) return Status::OK(); |
| srv->GetSlowLog()->SetMaxEntries(slowlog_max_len); |
| return Status::OK(); |
| }}, |
| {"slowlog-dump-logfile-level", |
| [this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { |
| if (!srv) return Status::OK(); |
| srv->GetSlowLog()->SetDumpToLogfileLevel(slowlog_dump_logfile_level); |
| return Status::OK(); |
| }}, |
| {"max-db-size", |
| [](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { |
| if (!srv) return Status::OK(); |
| srv->storage->CheckDBSizeLimit(); |
| return Status::OK(); |
| }}, |
| {"max-io-mb", |
| [this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { |
| if (!srv) return Status::OK(); |
| srv->storage->SetIORateLimit(max_io_mb); |
| return Status::OK(); |
| }}, |
| {"profiling-sample-record-max-len", |
| [this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { |
| if (!srv) return Status::OK(); |
| srv->GetPerfLog()->SetMaxEntries(profiling_sample_record_max_len); |
| return Status::OK(); |
| }}, |
| {"migrate-speed", |
| [this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { |
| if (!srv) return Status::OK(); |
| if (cluster_enabled) srv->slot_migrator->SetMaxMigrationSpeed(migrate_speed); |
| return Status::OK(); |
| }}, |
| {"migrate-pipeline-size", |
| [this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { |
| if (!srv) return Status::OK(); |
| if (cluster_enabled) srv->slot_migrator->SetMaxPipelineSize(pipeline_size); |
| return Status::OK(); |
| }}, |
| {"migrate-sequence-gap", |
| [this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { |
| if (!srv) return Status::OK(); |
| if (cluster_enabled) srv->slot_migrator->SetSequenceGapLimit(sequence_gap); |
| return Status::OK(); |
| }}, |
| {"migrate-batch-rate-limit-mb", |
| [this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { |
| if (!srv) return Status::OK(); |
| srv->slot_migrator->SetMigrateBatchRateLimit(migrate_batch_rate_limit_mb * MiB); |
| return Status::OK(); |
| }}, |
| {"migrate-batch-size-kb", |
| [this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { |
| if (!srv) return Status::OK(); |
| srv->slot_migrator->SetMigrateBatchSize(migrate_batch_size_kb * KiB); |
| return Status::OK(); |
| }}, |
| {"log-level", |
| [this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { |
| if (!srv) return Status::OK(); |
| spdlog::set_level(log_level); |
| return Status::OK(); |
| }}, |
| {"persist-cluster-nodes-enabled", |
| [this](Server *srv, [[maybe_unused]] const std::string &k, const std::string &v) -> Status { |
| if (!srv || !cluster_enabled) return Status::OK(); |
| auto nodes_file_path = NodesFilePath(); |
| if (v == "yes") { |
| return srv->cluster->DumpClusterNodes(nodes_file_path); |
| } |
| // Remove the cluster nodes file to avoid stale cluster nodes info |
| remove(nodes_file_path.data()); |
| return Status::OK(); |
| }}, |
| {"repl-namespace-enabled", |
| [](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { |
| if (!srv) return Status::OK(); |
| return srv->GetNamespace()->LoadAndRewrite(); |
| }}, |
| |
| {"rocksdb.target_file_size_base", |
| [this](Server *srv, const std::string &k, [[maybe_unused]] const std::string &v) -> Status { |
| if (!srv) return Status::OK(); |
| return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), |
| std::to_string(rocks_db.target_file_size_base * MiB)); |
| }}, |
| {"rocksdb.write_buffer_size", |
| [this](Server *srv, const std::string &k, [[maybe_unused]] const std::string &v) -> Status { |
| if (!srv) return Status::OK(); |
| return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), |
| std::to_string(rocks_db.write_buffer_size * MiB)); |
| }}, |
| {"rocksdb.disable_auto_compactions", |
| [](Server *srv, const std::string &k, const std::string &v) -> Status { |
| if (!srv) return Status::OK(); |
| std::string disable_auto_compactions = v == "yes" ? "true" : "false"; |
| return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), disable_auto_compactions); |
| }}, |
| {"rocksdb.max_total_wal_size", |
| [this](Server *srv, const std::string &k, [[maybe_unused]] const std::string &v) -> Status { |
| if (!srv) return Status::OK(); |
| return srv->storage->SetDBOption(TrimRocksDbPrefix(k), std::to_string(rocks_db.max_total_wal_size * MiB)); |
| }}, |
| {"rocksdb.enable_blob_files", |
| [this](Server *srv, const std::string &k, [[maybe_unused]] const std::string &v) -> Status { |
| if (!srv) return Status::OK(); |
| std::string enable_blob_files = rocks_db.enable_blob_files ? "true" : "false"; |
| return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), enable_blob_files); |
| }}, |
| {"rocksdb.min_blob_size", |
| [this](Server *srv, const std::string &k, const std::string &v) -> Status { |
| if (!srv) return Status::OK(); |
| if (!rocks_db.enable_blob_files) { |
| return {Status::NotOK, errBlobDbNotEnabled}; |
| } |
| return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), v); |
| }}, |
| {"rocksdb.blob_file_size", |
| [this](Server *srv, const std::string &k, [[maybe_unused]] const std::string &v) -> Status { |
| if (!srv) return Status::OK(); |
| if (!rocks_db.enable_blob_files) { |
| return {Status::NotOK, errBlobDbNotEnabled}; |
| } |
| return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), |
| std::to_string(rocks_db.blob_file_size)); |
| }}, |
| {"rocksdb.enable_blob_garbage_collection", |
| [this](Server *srv, const std::string &k, const std::string &v) -> Status { |
| if (!srv) return Status::OK(); |
| if (!rocks_db.enable_blob_files) { |
| return {Status::NotOK, errBlobDbNotEnabled}; |
| } |
| std::string enable_blob_garbage_collection = v == "yes" ? "true" : "false"; |
| return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), enable_blob_garbage_collection); |
| }}, |
| {"rocksdb.blob_garbage_collection_age_cutoff", |
| [this](Server *srv, const std::string &k, const std::string &v) -> Status { |
| if (!srv) return Status::OK(); |
| if (!rocks_db.enable_blob_files) { |
| return {Status::NotOK, errBlobDbNotEnabled}; |
| } |
| int val = 0; |
| auto parse_result = ParseInt<int>(v, 10); |
| if (!parse_result) { |
| return {Status::NotOK, "Illegal blob_garbage_collection_age_cutoff value."}; |
| } |
| val = *parse_result; |
| if (val < 0 || val > 100) { |
| return {Status::NotOK, "blob_garbage_collection_age_cutoff must >= 0 and <= 100."}; |
| } |
| |
| double cutoff = val / 100.0; |
| return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), std::to_string(cutoff)); |
| }}, |
| {"rocksdb.level_compaction_dynamic_level_bytes", |
| [](Server *srv, const std::string &k, const std::string &v) -> Status { |
| if (!srv) return Status::OK(); |
| std::string level_compaction_dynamic_level_bytes = v == "yes" ? "true" : "false"; |
| return srv->storage->SetDBOption(TrimRocksDbPrefix(k), level_compaction_dynamic_level_bytes); |
| }}, |
| {"rocksdb.max_bytes_for_level_base", |
| [this](Server *srv, const std::string &k, [[maybe_unused]] const std::string &v) -> Status { |
| if (!srv) return Status::OK(); |
| if (!rocks_db.level_compaction_dynamic_level_bytes) { |
| return {Status::NotOK, errLevelCompactionDynamicLevelBytesNotSet}; |
| } |
| return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), |
| std::to_string(rocks_db.max_bytes_for_level_base)); |
| }}, |
| {"rocksdb.max_bytes_for_level_multiplier", |
| [this](Server *srv, const std::string &k, const std::string &v) -> Status { |
| if (!srv) return Status::OK(); |
| if (!rocks_db.level_compaction_dynamic_level_bytes) { |
| return {Status::NotOK, errLevelCompactionDynamicLevelBytesNotSet}; |
| } |
| return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), v); |
| }}, |
| {"rocksdb.sst_file_delete_rate_bytes_per_sec", |
| [this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { |
| if (!srv) return Status::OK(); |
| srv->storage->SetSstFileDeleteRateBytesPerSecond(rocks_db.sst_file_delete_rate_bytes_per_sec); |
| return Status::OK(); |
| }}, |
| {"rocksdb.level0_slowdown_writes_trigger", |
| [this, &set_cf_option_cb](Server *srv, const std::string &k, |
| [[maybe_unused]] const std::string &v) -> Status { |
| if (rocks_db.level0_slowdown_writes_trigger == 0) { |
| return set_cf_option_cb(srv, k, std::to_string(rocks_db.level0_stop_writes_trigger)); |
| } |
| |
| return set_cf_option_cb(srv, k, v); |
| }}, |
| {"rocksdb.level0_stop_writes_trigger", |
| [this](Server *srv, const std::string &k, const std::string &v) -> Status { |
| if (!srv) return Status::OK(); |
| |
| std::unordered_map<std::string, std::string> options = { |
| {TrimRocksDbPrefix(k), v}, |
| }; |
| |
| if (rocks_db.level0_slowdown_writes_trigger == 0) { |
| options["level0_slowdown_writes_trigger"] = v; |
| } |
| |
| return srv->storage->SetOptionForAllColumnFamilies(options); |
| }}, |
| {"rocksdb.max_open_files", set_db_option_cb}, |
| {"rocksdb.stats_dump_period_sec", set_db_option_cb}, |
| {"rocksdb.delayed_write_rate", set_db_option_cb}, |
| {"rocksdb.max_background_compactions", set_db_option_cb}, |
| {"rocksdb.max_background_flushes", set_db_option_cb}, |
| {"rocksdb.max_subcompactions", set_db_option_cb}, |
| {"rocksdb.compaction_readahead_size", set_db_option_cb}, |
| {"rocksdb.max_background_jobs", set_db_option_cb}, |
| |
| {"rocksdb.max_compaction_bytes", set_cf_option_cb}, |
| {"rocksdb.max_write_buffer_number", set_cf_option_cb}, |
| {"rocksdb.min_write_buffer_number_to_merge", set_cf_option_cb}, |
| {"rocksdb.level0_file_num_compaction_trigger", set_cf_option_cb}, |
| {"rocksdb.compression", set_compression_type_cb}, |
| {"rocksdb.compression_start_level", set_compression_start_level_cb}, |
| #ifdef ENABLE_OPENSSL |
| {"tls-cert-file", set_tls_option}, |
| {"tls-key-file", set_tls_option}, |
| {"tls-key-file-pass", set_tls_option}, |
| {"tls-ca-cert-file", set_tls_option}, |
| {"tls-ca-cert-dir", set_tls_option}, |
| {"tls-protocols", set_tls_option}, |
| {"tls-auth-clients", set_tls_option}, |
| {"tls-ciphers", set_tls_option}, |
| {"tls-ciphersuites", set_tls_option}, |
| {"tls-prefer-server-ciphers", set_tls_option}, |
| {"tls-session-caching", set_tls_option}, |
| {"tls-session-cache-size", set_tls_option}, |
| {"tls-session-cache-timeout", set_tls_option}, |
| #endif |
| {"histogram-bucket-boundaries", |
| [this]([[maybe_unused]] Server *srv, [[maybe_unused]] const std::string &k, const std::string &v) -> Status { |
| std::vector<std::string> buckets = util::Split(v, ","); |
| histogram_bucket_boundaries.clear(); |
| if (buckets.size() < 1) { |
| return Status::OK(); |
| } |
| for (const auto &bucket_val : buckets) { |
| auto parse_result = ParseFloat<double>(bucket_val); |
| if (!parse_result) { |
| return {Status::NotOK, "The values in the bucket list must be double or integer."}; |
| } |
| histogram_bucket_boundaries.push_back(*parse_result); |
| } |
| if (!std::is_sorted(histogram_bucket_boundaries.begin(), histogram_bucket_boundaries.end())) { |
| return {Status::NotOK, "The values for the histogram must be sorted."}; |
| } |
| return Status::OK(); |
| }}, |
| }; |
| for (const auto &iter : callbacks) { |
| auto field_iter = fields_.find(iter.first); |
| if (field_iter != fields_.end()) { |
| field_iter->second->callback = iter.second; |
| } |
| } |
| } |
| |
| std::string Config::NodesFilePath() const { return dir + "/nodes.conf"; } |
| |
| void Config::SetMaster(const std::string &host, uint32_t port) { |
| master_host = host; |
| master_port = port; |
| auto iter = fields_.find("slaveof"); |
| if (iter != fields_.end()) { |
| auto s = iter->second->Set(master_host + " " + std::to_string(master_port)); |
| if (!s.IsOK()) { |
| error("Failed to set the value of 'slaveof' setting: {}", s.Msg()); |
| } |
| } |
| } |
| |
| void Config::ClearMaster() { |
| master_host.clear(); |
| master_port = 0; |
| auto iter = fields_.find("slaveof"); |
| if (iter != fields_.end()) { |
| auto s = iter->second->Set("no one"); |
| if (!s.IsOK()) { |
| error("Failed to clear the value of 'slaveof' setting: {}", s.Msg()); |
| } |
| } |
| } |
| |
| Status Config::parseConfigFromPair(const std::pair<std::string, std::string> &input, int line_number) { |
| std::string field_key = util::ToLower(input.first); |
| constexpr std::string_view ns_str = "namespace."; |
| if (util::StartsWithICase(input.first, ns_str)) { |
| // namespace should keep key case-sensitive |
| field_key = input.first; |
| load_tokens[input.second] = input.first.substr(ns_str.size()); |
| return Status::OK(); |
| } |
| |
| auto iter = fields_.find(field_key); |
| if (iter != fields_.end()) { |
| auto &field = iter->second; |
| field->line_number = line_number; |
| auto s = field->Set(input.second); |
| if (!s.IsOK()) return s.Prefixed(fmt::format("failed to set value of field '{}'", field_key)); |
| } else if (deprecated_fields_.find(field_key) != deprecated_fields_.end()) { |
| std::cout << fmt::format("WARNING: '{}' at line {} is deprecated and does not take effect.", field_key, line_number) |
| << std::endl; |
| } else { |
| std::cout << fmt::format("WARNING: '{}' at line {} is not a valid configuration key.", field_key, line_number) |
| << std::endl; |
| } |
| return Status::OK(); |
| } |
| |
| Status Config::parseConfigFromString(const std::string &input, int line_number) { |
| auto parsed = ParseConfigLine(input); |
| if (!parsed) return parsed.ToStatus().Prefixed("malformed line"); |
| |
| auto kv = std::move(*parsed); |
| |
| if (kv.first.empty() || kv.second.empty()) return Status::OK(); |
| |
| return parseConfigFromPair(kv, line_number); |
| } |
| |
| Status Config::finish() { |
| if (requirepass.empty() && !load_tokens.empty()) { |
| return {Status::NotOK, "requirepass empty wasn't allowed while the namespace exists"}; |
| } |
| if ((cluster_enabled) && !load_tokens.empty()) { |
| return {Status::NotOK, "enabled cluster mode wasn't allowed while the namespace exists"}; |
| } |
| if (unixsocket.empty() && binds.size() == 0) { |
| binds.emplace_back(kDefaultBindAddress); |
| } |
| if (cluster_enabled && binds.size() == 0) { |
| return {Status::NotOK, |
| "node is in cluster mode, but TCP listen address " |
| "wasn't specified via configuration file"}; |
| } |
| if (master_port != 0 && binds.size() == 0) { |
| return {Status::NotOK, "replication doesn't support unix socket"}; |
| } |
| if (db_dir.empty()) db_dir = dir + "/db"; |
| if (log_dir.empty()) log_dir = dir + ",stdout"; |
| std::vector<std::string> create_dirs = {dir}; |
| for (const auto &name : create_dirs) { |
| auto s = rocksdb::Env::Default()->CreateDirIfMissing(name); |
| if (!s.ok()) return {Status::NotOK, s.ToString()}; |
| } |
| return Status::OK(); |
| } |
| |
| Status Config::Load(const CLIOptions &opts) { |
| if (!opts.conf_file.empty()) { |
| std::ifstream file; |
| std::istream *in = nullptr; |
| if (opts.conf_file == "-") { |
| in = &std::cin; |
| } else { |
| path_ = opts.conf_file; |
| file.open(path_); |
| if (!file.is_open()) { |
| return {Status::NotOK, fmt::format("failed to open file '{}': {}", path_, strerror(errno))}; |
| } |
| |
| in = &file; |
| } |
| |
| std::string line; |
| int line_num = 1; |
| while (in->good() && std::getline(*in, line)) { |
| if (auto s = parseConfigFromString(line, line_num); !s.IsOK()) { |
| return s.Prefixed(fmt::format("at line #L{}", line_num)); |
| } |
| |
| line_num++; |
| } |
| } else { |
| std::cout << "WARNING: No config file specified, default configuration applied. " |
| << "In order to specify a config file, use `kvrocks -c /path/to/kvrocks.conf`." << std::endl; |
| } |
| |
| for (const auto &opt : opts.cli_options) { |
| GET_OR_RET(parseConfigFromPair(opt, -1).Prefixed("CLI config option error")); |
| } |
| |
| for (const auto &iter : fields_) { |
| // line_number = 0 means the user didn't specify the field value |
| // on config file and would use default value, so won't validate here. |
| if (iter.second->line_number != 0 && iter.second->validate) { |
| auto s = iter.second->validate(iter.first, iter.second->ToString()); |
| if (!s.IsOK()) { |
| return s.Prefixed(fmt::format("at line #L{}: {} is invalid", iter.second->line_number, iter.first)); |
| } |
| } |
| } |
| |
| for (const auto &iter : fields_) { |
| if (iter.second->callback) { |
| auto s = iter.second->callback(nullptr, iter.first, iter.second->ToString()); |
| if (!s.IsOK()) { |
| return s.Prefixed(fmt::format("while changing key '{}'", iter.first)); |
| } |
| } |
| } |
| return finish(); |
| } |
| |
| void Config::Get(const std::string &key, std::vector<std::string> *values) const { |
| values->clear(); |
| for (const auto &iter : fields_) { |
| if (util::StringMatch(key, iter.first, true)) { |
| if (iter.second->IsMultiConfig()) { |
| for (const auto &p : util::Split(iter.second->ToString(), "\n")) { |
| values->emplace_back(iter.first); |
| values->emplace_back(p); |
| } |
| } else { |
| values->emplace_back(iter.first); |
| values->emplace_back(iter.second->ToString()); |
| } |
| } |
| } |
| } |
| |
| Status Config::Set(Server *srv, std::string key, const std::string &value) { |
| key = util::ToLower(key); |
| auto iter = fields_.find(key); |
| if (iter == fields_.end() || iter->second->readonly) { |
| return {Status::NotOK, "Unsupported CONFIG parameter: " + key}; |
| } |
| |
| auto &field = iter->second; |
| if (field->validate) { |
| auto s = field->validate(key, value); |
| if (!s.IsOK()) return s.Prefixed("invalid value"); |
| } |
| |
| auto origin_value = field->ToStringForRewrite(); |
| auto s = field->Set(value); |
| if (!s.IsOK()) return s.Prefixed("failed to set new value"); |
| |
| if (field->callback) { |
| s = field->callback(srv, key, value); |
| if (!s.IsOK()) { |
| // rollback the value if the callback failed |
| auto set_status = field->Set(origin_value); |
| if (!set_status.IsOK()) { |
| return set_status.Prefixed("failed to rollback the value"); |
| } |
| } |
| return s; |
| } |
| |
| return Status::OK(); |
| } |
| |
| bool Config::checkFieldValueIsDefault(const std::string &key, const std::string &value) const { |
| auto iter = fields_.find(key); |
| return iter != fields_.end() && iter->second->Default() == value; |
| } |
| |
| Status Config::Rewrite(const std::map<std::string, std::string> &tokens) { |
| if (!HasConfigFile()) { |
| return {Status::NotOK, "the server is running without a config file"}; |
| } |
| |
| std::vector<std::string> lines; |
| std::map<std::string, std::string> new_config; |
| for (const auto &iter : fields_) { |
| if (iter.second->IsMultiConfig()) { |
| // We should NOT overwrite the commands which are MultiConfig since it cannot be rewritten in-flight, |
| // so skip it here to avoid rewriting it as new item. |
| continue; |
| } |
| new_config[iter.first] = iter.second->ToStringForRewrite(); |
| } |
| |
| std::string namespace_prefix = "namespace."; |
| if (!repl_namespace_enabled) { // need to rewrite to the configuration if we don't replicate namespaces |
| for (const auto &iter : tokens) { |
| new_config[namespace_prefix + iter.second] = iter.first; |
| } |
| } |
| |
| std::ifstream file(path_); |
| if (file.is_open()) { |
| std::string raw_line; |
| while (file.good() && std::getline(file, raw_line)) { |
| auto parsed = ParseConfigLine(raw_line); |
| if (!parsed || parsed->first.empty()) { |
| lines.emplace_back(raw_line); |
| continue; |
| } |
| auto kv = std::move(*parsed); |
| if (util::StartsWith(kv.first, namespace_prefix)) { |
| // Ignore namespace fields here since we would always rewrite them |
| continue; |
| } |
| auto iter = new_config.find(util::ToLower(kv.first)); |
| if (iter != new_config.end()) { |
| if (!iter->second.empty()) lines.emplace_back(DumpConfigLine({iter->first, iter->second})); |
| new_config.erase(iter); |
| } else { |
| lines.emplace_back(raw_line); |
| } |
| } |
| } |
| file.close(); |
| |
| std::string out_buf; |
| for (const auto &line : lines) { |
| fmt::format_to(std::back_inserter(out_buf), "{}\n", line); |
| } |
| for (const auto &remain : new_config) { |
| if (remain.second.empty() || checkFieldValueIsDefault(remain.first, remain.second)) continue; |
| fmt::format_to(std::back_inserter(out_buf), "{}\n", DumpConfigLine({remain.first, remain.second})); |
| } |
| std::string tmp_path = path_ + ".tmp"; |
| remove(tmp_path.data()); |
| std::ofstream output_file(tmp_path, std::ios::out); |
| output_file << out_buf; |
| output_file.close(); |
| if (rename(tmp_path.data(), path_.data()) < 0) { |
| return {Status::NotOK, fmt::format("rename file encounter error: {}", strerror(errno))}; |
| } |
| return Status::OK(); |
| } |