| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| #include "server.h" |
| |
| #include <rocksdb/convenience.h> |
| #include <rocksdb/statistics.h> |
| #include <sys/resource.h> |
| #include <sys/statvfs.h> |
| #include <sys/utsname.h> |
| |
| #include <algorithm> |
| #include <atomic> |
| #include <cstdint> |
| #include <cstdlib> |
| #include <functional> |
| #include <iomanip> |
| #include <jsoncons/json.hpp> |
| #include <memory> |
| #include <mutex> |
| #include <shared_mutex> |
| #include <utility> |
| |
| #include "commands/commander.h" |
| #include "common/string_util.h" |
| #include "config/config.h" |
| #include "fmt/format.h" |
| #include "logging.h" |
| #include "redis_connection.h" |
| #include "rocksdb/version.h" |
| #include "storage/compaction_checker.h" |
| #include "storage/redis_db.h" |
| #include "storage/scripting.h" |
| #include "storage/storage.h" |
| #include "thread_util.h" |
| #include "time_util.h" |
| #include "version.h" |
| #include "worker.h" |
| |
| Server::Server(engine::Storage *storage, Config *config) |
| : stats(config->histogram_bucket_boundaries), |
| storage(storage), |
| indexer(storage), |
| index_mgr(&indexer, storage), |
| start_time_secs_(util::GetTimeStamp()), |
| config_(config), |
| namespace_(storage) { |
| // init commands stats here to prevent concurrent insert, and cause core |
| auto commands = redis::CommandTable::GetOriginal(); |
| |
| for (const auto &iter : *commands) { |
| stats.commands_stats[iter.first].calls = 0; |
| stats.commands_stats[iter.first].latency = 0; |
| |
| if (stats.bucket_boundaries.size() > 0) { |
| // NB: Extra index for the last bucket (Inf) |
| for (std::size_t i{0}; i <= stats.bucket_boundaries.size(); ++i) { |
| stats.commands_histogram[iter.first].buckets.push_back(std::make_unique<std::atomic<uint64_t>>(0)); |
| } |
| stats.commands_histogram[iter.first].calls = 0; |
| stats.commands_histogram[iter.first].sum = 0; |
| } |
| } |
| |
| // init cursor_dict_ |
| cursor_dict_ = std::make_unique<CursorDictType>(); |
| |
| #ifdef ENABLE_OPENSSL |
| // init ssl context |
| if (config->tls_port || config->tls_replication) { |
| ssl_ctx = CreateSSLContext(config); |
| if (!ssl_ctx) { |
| exit(1); |
| } |
| } |
| #endif |
| |
| // Init cluster |
| cluster = std::make_unique<Cluster>(this, config_->binds, config_->port); |
| |
| // init shard pub/sub channels |
| pubsub_shard_channels_.resize(config->cluster_enabled ? HASH_SLOTS_SIZE : 1); |
| |
| for (int i = 0; i < config->workers; i++) { |
| auto worker = std::make_unique<Worker>(this, config); |
| // multiple workers can't listen to the same unix socket, so |
| // listen unix socket only from a single worker - the first one |
| if (!config->unixsocket.empty() && i == 0) { |
| Status s = worker->ListenUnixSocket(config->unixsocket, config->unixsocketperm, config->backlog); |
| if (!s.IsOK()) { |
| error("[server] Failed to listen on unix socket: {}. Error: {}", config->unixsocket, s.Msg()); |
| exit(1); |
| } |
| info("[server] Listening on unix socket: {}", config->unixsocket); |
| } |
| worker_threads_.emplace_back(std::make_unique<WorkerThread>(std::move(worker))); |
| } |
| |
| AdjustOpenFilesLimit(); |
| slow_log_.SetMaxEntries(config->slowlog_max_len); |
| slow_log_.SetDumpToLogfileLevel(config->slowlog_dump_logfile_level); |
| perf_log_.SetMaxEntries(config->profiling_sample_record_max_len); |
| } |
| |
| Server::~Server() { |
| DisconnectSlaves(); |
| // Wait for all fetch file threads stop and exit and force destroy the server after 60s. |
| int counter = 0; |
| while (GetFetchFileThreadNum() != 0) { |
| usleep(100000); |
| if (++counter == 600) { |
| warn("[server] Will force destroy the server after waiting 60s, leave {} fetch file threads are still running", |
| GetFetchFileThreadNum()); |
| break; |
| } |
| } |
| |
| for (auto &worker_thread : worker_threads_) { |
| worker_thread.reset(); |
| } |
| cleanupExitedWorkerThreads(true /* force */); |
| CleanupExitedSlaves(); |
| } |
| |
| // Kvrocks threads list: |
| // - Work-thread: process client's connections and requests |
| // - Task-runner: one thread pool, handle some jobs that may freeze server if run directly |
| // - Cron-thread: server's crontab, clean backups, resize sst and memtable size |
| // - Compaction-checker: active compaction according to collected statistics |
| // - Replication-thread: replicate incremental stream from master if in slave role, there |
| // are some dynamic threads to fetch files when full sync. |
| // - fetch-file-thread: fetch SST files from master |
| // - Feed-slave-thread: feed data to slaves if having slaves, but there also are some dynamic |
| // threads when full sync, TODO(@shooterit) we should manage this threads uniformly. |
| // - feed-replica-data-info: generate checkpoint and send files list when full sync |
| // - feed-replica-file: send SST files when slaves ask for full sync |
| Status Server::Start() { |
| auto s = namespace_.LoadAndRewrite(); |
| if (!s.IsOK()) { |
| return s; |
| } |
| if (!config_->master_host.empty()) { |
| s = AddMaster(config_->master_host, static_cast<uint32_t>(config_->master_port), false); |
| if (!s.IsOK()) return s; |
| } else { |
| // Generate new replication id if not a replica |
| engine::Context ctx(storage); |
| s = storage->ShiftReplId(ctx); |
| if (!s.IsOK()) { |
| return s.Prefixed("failed to shift replication id"); |
| } |
| } |
| |
| if (!config_->cluster_enabled) { |
| engine::Context no_txn_ctx = engine::Context::NoTransactionContext(storage); |
| GET_OR_RET(index_mgr.Load(no_txn_ctx, kDefaultNamespace)); |
| for (const auto &[_, ns] : namespace_.List()) { |
| GET_OR_RET(index_mgr.Load(no_txn_ctx, ns)); |
| } |
| } |
| |
| if (config_->cluster_enabled) { |
| // Create objects used for slot migration |
| slot_migrator = std::make_unique<SlotMigrator>(this); |
| |
| if (config_->persist_cluster_nodes_enabled) { |
| auto s = cluster->LoadClusterNodes(config_->NodesFilePath()); |
| if (!s.IsOK()) { |
| return s.Prefixed("failed to load cluster nodes info"); |
| } |
| } |
| |
| auto s = slot_migrator->CreateMigrationThread(); |
| if (!s.IsOK()) { |
| return s.Prefixed("failed to create migration thread"); |
| } |
| |
| slot_import = std::make_unique<SlotImport>(this); |
| } |
| |
| for (const auto &worker : worker_threads_) { |
| worker->Start(); |
| } |
| |
| if (auto s = task_runner_.Start(); !s) { |
| warn("Failed to start task runner: {}", s.Msg()); |
| } |
| // setup server cron thread |
| cron_thread_ = GET_OR_RET(util::CreateThread("server-cron", [this] { this->cron(); })); |
| |
| compaction_checker_thread_ = GET_OR_RET(util::CreateThread("compact-check", [this] { |
| uint64_t counter = 0; |
| int64_t last_compact_date = 0; |
| CompactionChecker compaction_checker{this->storage}; |
| |
| while (!stop_) { |
| // Sleep first |
| std::this_thread::sleep_for(std::chrono::milliseconds(100)); |
| |
| // To guarantee accessing DB safely |
| auto guard = storage->ReadLockGuard(); |
| if (storage->IsClosing()) continue; |
| |
| if (!is_loading_ && ++counter % 600 == 0 // check every minute |
| && config_->compaction_checker_cron.IsEnabled()) { |
| auto t_now = static_cast<time_t>(util::GetTimeStamp()); |
| std::tm now{}; |
| localtime_r(&t_now, &now); |
| if (config_->compaction_checker_cron.IsTimeMatch(&now)) { |
| const auto &column_family_list = engine::ColumnFamilyConfigs::ListAllColumnFamilies(); |
| for (auto &column_family : column_family_list) { |
| compaction_checker.PickCompactionFilesForCf(column_family); |
| } |
| } |
| // compact once per day |
| auto now_hours = t_now / 3600; |
| if (now_hours != 0 && last_compact_date != now_hours / 24) { |
| last_compact_date = now_hours / 24; |
| compaction_checker.CompactPropagateAndPubSubFiles(); |
| } |
| } |
| } |
| })); |
| |
| memory_startup_use_.store(Stats::GetMemoryRSS(), std::memory_order_relaxed); |
| info("[server] Ready to accept connections"); |
| |
| return Status::OK(); |
| } |
| |
| void Server::Stop() { |
| stop_ = true; |
| |
| slaveof_mu_.lock(); |
| if (replication_thread_) replication_thread_->Stop(); |
| slaveof_mu_.unlock(); |
| |
| for (const auto &worker : worker_threads_) { |
| worker->Stop(0 /* immediately terminate */); |
| } |
| |
| task_runner_.Cancel(); |
| } |
| |
| void Server::Join() { |
| if (auto s = util::ThreadJoin(cron_thread_); !s) { |
| warn("Cron thread operation failed: {}", s.Msg()); |
| } |
| if (auto s = util::ThreadJoin(compaction_checker_thread_); !s) { |
| warn("Compaction checker thread operation failed: {}", s.Msg()); |
| } |
| if (auto s = task_runner_.Join(); !s) { |
| warn("{}", s.Msg()); |
| } |
| for (const auto &worker : worker_threads_) { |
| worker->Join(); |
| } |
| } |
| |
| Status Server::AddMaster(const std::string &host, uint32_t port, bool force_reconnect) { |
| std::lock_guard<std::mutex> guard(slaveof_mu_); |
| |
| // Don't check host and port if 'force_reconnect' argument is set to true |
| if (!force_reconnect && !master_host_.empty() && master_host_ == host && master_port_ == port) { |
| return Status::OK(); |
| } |
| |
| // Master is changed |
| if (!master_host_.empty()) { |
| if (replication_thread_) replication_thread_->Stop(); |
| replication_thread_ = nullptr; |
| } |
| |
| // For master using old version, it uses replication thread to implement |
| // replication, and uses 'listen-port + 1' as thread listening port. |
| uint32_t master_listen_port = port; |
| if (GetConfig()->master_use_repl_port) master_listen_port += 1; |
| |
| replication_thread_ = std::make_unique<ReplicationThread>(host, master_listen_port, this); |
| auto s = replication_thread_->Start([this]() { return PrepareRestoreDB(); }, |
| [this]() { |
| this->is_loading_ = false; |
| if (auto s = task_runner_.Start(); !s) { |
| warn("Failed to start task runner: {}", s.Msg()); |
| } |
| }); |
| if (s.IsOK()) { |
| master_host_ = host; |
| master_port_ = port; |
| config_->SetMaster(host, port); |
| } else { |
| replication_thread_ = nullptr; |
| } |
| return s; |
| } |
| |
| Status Server::RemoveMaster() { |
| std::lock_guard<std::mutex> guard(slaveof_mu_); |
| |
| if (!master_host_.empty()) { |
| master_host_.clear(); |
| master_port_ = 0; |
| config_->ClearMaster(); |
| if (replication_thread_) { |
| replication_thread_->Stop(); |
| replication_thread_ = nullptr; |
| } |
| engine::Context ctx(storage); |
| return storage->ShiftReplId(ctx); |
| } |
| return Status::OK(); |
| } |
| |
| Status Server::AddSlave(redis::Connection *conn, rocksdb::SequenceNumber next_repl_seq) { |
| auto t = std::make_unique<FeedSlaveThread>(this, conn, next_repl_seq); |
| auto s = t->Start(); |
| if (!s.IsOK()) { |
| return s; |
| } |
| |
| std::lock_guard<std::mutex> lg(slave_threads_mu_); |
| slave_threads_.emplace_back(std::move(t)); |
| return Status::OK(); |
| } |
| |
| void Server::DisconnectSlaves() { |
| std::lock_guard<std::mutex> lg(slave_threads_mu_); |
| |
| for (auto &slave_thread : slave_threads_) { |
| if (!slave_thread->IsStopped()) slave_thread->Stop(); |
| } |
| |
| while (!slave_threads_.empty()) { |
| auto slave_thread = std::move(slave_threads_.front()); |
| slave_threads_.pop_front(); |
| slave_thread->Join(); |
| } |
| } |
| |
| void Server::CleanupExitedSlaves() { |
| std::lock_guard<std::mutex> lg(slave_threads_mu_); |
| |
| for (auto it = slave_threads_.begin(); it != slave_threads_.end();) { |
| if ((*it)->IsStopped()) { |
| auto thread = std::move(*it); |
| it = slave_threads_.erase(it); |
| thread->Join(); |
| } else { |
| ++it; |
| } |
| } |
| } |
| |
| void Server::FeedMonitorConns(redis::Connection *conn, const std::vector<std::string> &tokens) { |
| if (monitor_clients_ <= 0) return; |
| |
| auto now_us = util::GetTimeStampUS(); |
| std::string output = |
| fmt::format("{}.{} [{} {}]", now_us / 1000000, now_us % 1000000, conn->GetNamespace(), conn->GetAddr()); |
| for (const auto &token : tokens) { |
| output += " \""; |
| output += util::EscapeString(token); |
| output += "\""; |
| } |
| |
| for (const auto &worker_thread : worker_threads_) { |
| auto worker = worker_thread->GetWorker(); |
| worker->FeedMonitorConns(conn, redis::SimpleString(output)); |
| } |
| } |
| |
| int Server::PublishMessage(const std::string &channel, const std::string &msg) { |
| int cnt = 0; |
| int index = 0; |
| |
| pubsub_channels_mu_.lock(); |
| |
| std::vector<ConnContext> to_publish_conn_ctxs; |
| if (auto iter = pubsub_channels_.find(channel); iter != pubsub_channels_.end()) { |
| for (const auto &conn_ctx : iter->second) { |
| to_publish_conn_ctxs.emplace_back(conn_ctx); |
| } |
| } |
| |
| // The patterns variable records the pattern of connections |
| std::vector<std::string> patterns; |
| std::vector<ConnContext> to_publish_patterns_conn_ctxs; |
| for (const auto &iter : pubsub_patterns_) { |
| if (util::StringMatch(iter.first, channel, false)) { |
| for (const auto &conn_ctx : iter.second) { |
| to_publish_patterns_conn_ctxs.emplace_back(conn_ctx); |
| patterns.emplace_back(iter.first); |
| } |
| } |
| } |
| pubsub_channels_mu_.unlock(); |
| |
| std::string channel_reply; |
| channel_reply.append(redis::MultiLen(3)); |
| channel_reply.append(redis::BulkString("message")); |
| channel_reply.append(redis::BulkString(channel)); |
| channel_reply.append(redis::BulkString(msg)); |
| for (const auto &conn_ctx : to_publish_conn_ctxs) { |
| auto s = conn_ctx.owner->Reply(conn_ctx.fd, channel_reply); |
| if (s.IsOK()) { |
| cnt++; |
| } |
| } |
| |
| // We should publish corresponding pattern and message for connections |
| for (const auto &conn_ctx : to_publish_patterns_conn_ctxs) { |
| std::string pattern_reply; |
| pattern_reply.append(redis::MultiLen(4)); |
| pattern_reply.append(redis::BulkString("pmessage")); |
| pattern_reply.append(redis::BulkString(patterns[index++])); |
| pattern_reply.append(redis::BulkString(channel)); |
| pattern_reply.append(redis::BulkString(msg)); |
| auto s = conn_ctx.owner->Reply(conn_ctx.fd, pattern_reply); |
| if (s.IsOK()) { |
| cnt++; |
| } |
| } |
| |
| return cnt; |
| } |
| |
| void Server::SubscribeChannel(const std::string &channel, redis::Connection *conn) { |
| std::lock_guard<std::mutex> guard(pubsub_channels_mu_); |
| |
| auto conn_ctx = ConnContext(conn->Owner(), conn->GetFD()); |
| if (auto iter = pubsub_channels_.find(channel); iter == pubsub_channels_.end()) { |
| pubsub_channels_.emplace(channel, std::list<ConnContext>{conn_ctx}); |
| } else { |
| iter->second.emplace_back(conn_ctx); |
| } |
| } |
| |
| void Server::UnsubscribeChannel(const std::string &channel, redis::Connection *conn) { |
| std::lock_guard<std::mutex> guard(pubsub_channels_mu_); |
| |
| auto iter = pubsub_channels_.find(channel); |
| if (iter == pubsub_channels_.end()) { |
| return; |
| } |
| |
| for (const auto &conn_ctx : iter->second) { |
| if (conn->GetFD() == conn_ctx.fd && conn->Owner() == conn_ctx.owner) { |
| iter->second.remove(conn_ctx); |
| if (iter->second.empty()) { |
| pubsub_channels_.erase(iter); |
| } |
| break; |
| } |
| } |
| } |
| |
| void Server::GetChannelsByPattern(const std::string &pattern, std::vector<std::string> *channels) { |
| std::lock_guard<std::mutex> guard(pubsub_channels_mu_); |
| |
| for (const auto &iter : pubsub_channels_) { |
| if (pattern.empty() || util::StringMatch(pattern, iter.first, false)) { |
| channels->emplace_back(iter.first); |
| } |
| } |
| } |
| |
| void Server::ListChannelSubscribeNum(const std::vector<std::string> &channels, |
| std::vector<ChannelSubscribeNum> *channel_subscribe_nums) { |
| std::lock_guard<std::mutex> guard(pubsub_channels_mu_); |
| |
| for (const auto &chan : channels) { |
| if (auto iter = pubsub_channels_.find(chan); iter != pubsub_channels_.end()) { |
| channel_subscribe_nums->emplace_back(ChannelSubscribeNum{iter->first, iter->second.size()}); |
| } else { |
| channel_subscribe_nums->emplace_back(ChannelSubscribeNum{chan, 0}); |
| } |
| } |
| } |
| |
| void Server::PSubscribeChannel(const std::string &pattern, redis::Connection *conn) { |
| std::lock_guard<std::mutex> guard(pubsub_channels_mu_); |
| |
| auto conn_ctx = ConnContext(conn->Owner(), conn->GetFD()); |
| if (auto iter = pubsub_patterns_.find(pattern); iter == pubsub_patterns_.end()) { |
| pubsub_patterns_.emplace(pattern, std::list<ConnContext>{conn_ctx}); |
| } else { |
| iter->second.emplace_back(conn_ctx); |
| } |
| } |
| |
| void Server::PUnsubscribeChannel(const std::string &pattern, redis::Connection *conn) { |
| std::lock_guard<std::mutex> guard(pubsub_channels_mu_); |
| |
| auto iter = pubsub_patterns_.find(pattern); |
| if (iter == pubsub_patterns_.end()) { |
| return; |
| } |
| |
| for (const auto &conn_ctx : iter->second) { |
| if (conn->GetFD() == conn_ctx.fd && conn->Owner() == conn_ctx.owner) { |
| iter->second.remove(conn_ctx); |
| if (iter->second.empty()) { |
| pubsub_patterns_.erase(iter); |
| } |
| break; |
| } |
| } |
| } |
| |
| void Server::SSubscribeChannel(const std::string &channel, redis::Connection *conn, uint16_t slot) { |
| assert((config_->cluster_enabled && slot < HASH_SLOTS_SIZE) || slot == 0); |
| std::lock_guard<std::mutex> guard(pubsub_shard_channels_mu_); |
| |
| auto conn_ctx = ConnContext(conn->Owner(), conn->GetFD()); |
| if (auto iter = pubsub_shard_channels_[slot].find(channel); iter == pubsub_shard_channels_[slot].end()) { |
| pubsub_shard_channels_[slot].emplace(channel, std::list<ConnContext>{conn_ctx}); |
| } else { |
| iter->second.emplace_back(conn_ctx); |
| } |
| } |
| |
| void Server::SUnsubscribeChannel(const std::string &channel, redis::Connection *conn, uint16_t slot) { |
| assert((config_->cluster_enabled && slot < HASH_SLOTS_SIZE) || slot == 0); |
| std::lock_guard<std::mutex> guard(pubsub_shard_channels_mu_); |
| |
| auto iter = pubsub_shard_channels_[slot].find(channel); |
| if (iter == pubsub_shard_channels_[slot].end()) { |
| return; |
| } |
| |
| for (const auto &conn_ctx : iter->second) { |
| if (conn->GetFD() == conn_ctx.fd && conn->Owner() == conn_ctx.owner) { |
| iter->second.remove(conn_ctx); |
| if (iter->second.empty()) { |
| pubsub_shard_channels_[slot].erase(iter); |
| } |
| break; |
| } |
| } |
| } |
| |
| void Server::GetSChannelsByPattern(const std::string &pattern, std::vector<std::string> *channels) { |
| std::lock_guard<std::mutex> guard(pubsub_shard_channels_mu_); |
| |
| for (const auto &shard_channels : pubsub_shard_channels_) { |
| for (const auto &iter : shard_channels) { |
| if (pattern.empty() || util::StringMatch(pattern, iter.first, false)) { |
| channels->emplace_back(iter.first); |
| } |
| } |
| } |
| } |
| |
| void Server::ListSChannelSubscribeNum(const std::vector<std::string> &channels, |
| std::vector<ChannelSubscribeNum> *channel_subscribe_nums) { |
| std::lock_guard<std::mutex> guard(pubsub_shard_channels_mu_); |
| |
| for (const auto &chan : channels) { |
| uint16_t slot = config_->cluster_enabled ? GetSlotIdFromKey(chan) : 0; |
| if (auto iter = pubsub_shard_channels_[slot].find(chan); iter != pubsub_shard_channels_[slot].end()) { |
| channel_subscribe_nums->emplace_back(ChannelSubscribeNum{iter->first, iter->second.size()}); |
| } else { |
| channel_subscribe_nums->emplace_back(ChannelSubscribeNum{chan, 0}); |
| } |
| } |
| } |
| |
| void Server::BlockOnKey(const std::string &key, redis::Connection *conn) { |
| std::lock_guard<std::mutex> guard(blocking_keys_mu_); |
| |
| auto conn_ctx = ConnContext(conn->Owner(), conn->GetFD()); |
| |
| if (auto iter = blocking_keys_.find(key); iter == blocking_keys_.end()) { |
| blocking_keys_.emplace(key, std::list<ConnContext>{conn_ctx}); |
| } else { |
| iter->second.emplace_back(conn_ctx); |
| } |
| |
| IncrBlockedClientNum(); |
| } |
| |
| void Server::UnblockOnKey(const std::string &key, redis::Connection *conn) { |
| std::lock_guard<std::mutex> guard(blocking_keys_mu_); |
| |
| auto iter = blocking_keys_.find(key); |
| if (iter == blocking_keys_.end()) { |
| return; |
| } |
| |
| for (const auto &conn_ctx : iter->second) { |
| if (conn->GetFD() == conn_ctx.fd && conn->Owner() == conn_ctx.owner) { |
| iter->second.remove(conn_ctx); |
| if (iter->second.empty()) { |
| blocking_keys_.erase(iter); |
| } |
| break; |
| } |
| } |
| |
| DecrBlockedClientNum(); |
| } |
| |
| void Server::BlockOnStreams(const std::vector<std::string> &keys, const std::vector<redis::StreamEntryID> &entry_ids, |
| redis::Connection *conn) { |
| std::lock_guard<std::mutex> guard(blocked_stream_consumers_mu_); |
| |
| IncrBlockedClientNum(); |
| |
| for (size_t i = 0; i < keys.size(); ++i) { |
| auto consumer = std::make_shared<StreamConsumer>(conn->Owner(), conn->GetFD(), conn->GetNamespace(), entry_ids[i]); |
| if (auto iter = blocked_stream_consumers_.find(keys[i]); iter == blocked_stream_consumers_.end()) { |
| std::set<std::shared_ptr<StreamConsumer>> consumers; |
| consumers.insert(consumer); |
| blocked_stream_consumers_.emplace(keys[i], consumers); |
| } else { |
| iter->second.insert(consumer); |
| } |
| } |
| } |
| |
| void Server::UnblockOnStreams(const std::vector<std::string> &keys, redis::Connection *conn) { |
| std::lock_guard<std::mutex> guard(blocked_stream_consumers_mu_); |
| |
| DecrBlockedClientNum(); |
| |
| for (const auto &key : keys) { |
| auto iter = blocked_stream_consumers_.find(key); |
| if (iter == blocked_stream_consumers_.end()) { |
| continue; |
| } |
| |
| for (auto it = iter->second.begin(); it != iter->second.end();) { |
| const auto &consumer = *it; |
| if (conn->GetFD() == consumer->fd && conn->Owner() == consumer->owner) { |
| iter->second.erase(it); |
| if (iter->second.empty()) { |
| blocked_stream_consumers_.erase(iter); |
| } |
| break; |
| } |
| ++it; |
| } |
| } |
| } |
| |
| void Server::WakeupBlockingConns(const std::string &key, size_t n_conns) { |
| std::lock_guard<std::mutex> guard(blocking_keys_mu_); |
| |
| auto iter = blocking_keys_.find(key); |
| if (iter == blocking_keys_.end() || iter->second.empty()) { |
| return; |
| } |
| |
| while (n_conns-- && !iter->second.empty()) { |
| auto conn_ctx = iter->second.front(); |
| auto s = conn_ctx.owner->EnableWriteEvent(conn_ctx.fd); |
| if (!s.IsOK()) { |
| error("[server] Failed to enable write event on blocked client {}: {}", conn_ctx.fd, s.Msg()); |
| } |
| iter->second.pop_front(); |
| } |
| } |
| |
| void Server::OnEntryAddedToStream(const std::string &ns, const std::string &key, const redis::StreamEntryID &entry_id) { |
| std::lock_guard<std::mutex> guard(blocked_stream_consumers_mu_); |
| |
| auto iter = blocked_stream_consumers_.find(key); |
| if (iter == blocked_stream_consumers_.end() || iter->second.empty()) { |
| return; |
| } |
| |
| for (auto it = iter->second.begin(); it != iter->second.end();) { |
| auto consumer = *it; |
| if (consumer->ns == ns && entry_id > consumer->last_consumed_id) { |
| auto s = consumer->owner->EnableWriteEvent(consumer->fd); |
| if (!s.IsOK()) { |
| error("[server] Failed to enable write event on blocked stream consumer {}: {}", consumer->fd, s.Msg()); |
| } |
| it = iter->second.erase(it); |
| } else { |
| ++it; |
| } |
| } |
| } |
| |
| void Server::updateCachedTime() { unix_time_secs.store(util::GetTimeStamp()); } |
| |
| int Server::IncrClientNum() { |
| total_clients_.fetch_add(1, std::memory_order_relaxed); |
| return connected_clients_.fetch_add(1, std::memory_order_relaxed); |
| } |
| |
| int Server::DecrClientNum() { return connected_clients_.fetch_sub(1, std::memory_order_relaxed); } |
| |
| int Server::IncrMonitorClientNum() { return monitor_clients_.fetch_add(1, std::memory_order_relaxed); } |
| |
| int Server::DecrMonitorClientNum() { return monitor_clients_.fetch_sub(1, std::memory_order_relaxed); } |
| |
| int Server::IncrBlockedClientNum() { return blocked_clients_.fetch_add(1, std::memory_order_relaxed); } |
| |
| int Server::DecrBlockedClientNum() { return blocked_clients_.fetch_sub(1, std::memory_order_relaxed); } |
| |
| std::shared_lock<std::shared_mutex> Server::WorkConcurrencyGuard() { |
| return std::shared_lock(works_concurrency_rw_lock_); |
| } |
| |
| std::unique_lock<std::shared_mutex> Server::WorkExclusivityGuard() { |
| return std::unique_lock(works_concurrency_rw_lock_); |
| } |
| |
| uint64_t Server::GetClientID() { return client_id_.fetch_add(1, std::memory_order_relaxed); } |
| |
| void Server::recordInstantaneousMetrics() { |
| auto rocksdb_stats = storage->GetDB()->GetDBOptions().statistics; |
| stats.TrackInstantaneousMetric(STATS_METRIC_COMMAND, stats.total_calls); |
| stats.TrackInstantaneousMetric(STATS_METRIC_NET_INPUT, stats.in_bytes); |
| stats.TrackInstantaneousMetric(STATS_METRIC_NET_OUTPUT, stats.out_bytes); |
| stats.TrackInstantaneousMetric(STATS_METRIC_ROCKSDB_PUT, |
| rocksdb_stats->getTickerCount(rocksdb::Tickers::NUMBER_KEYS_WRITTEN)); |
| stats.TrackInstantaneousMetric(STATS_METRIC_ROCKSDB_GET, |
| rocksdb_stats->getTickerCount(rocksdb::Tickers::NUMBER_KEYS_READ)); |
| stats.TrackInstantaneousMetric(STATS_METRIC_ROCKSDB_MULTIGET, |
| rocksdb_stats->getTickerCount(rocksdb::Tickers::NUMBER_MULTIGET_KEYS_READ)); |
| stats.TrackInstantaneousMetric(STATS_METRIC_ROCKSDB_SEEK, |
| rocksdb_stats->getTickerCount(rocksdb::Tickers::NUMBER_DB_SEEK)); |
| stats.TrackInstantaneousMetric(STATS_METRIC_ROCKSDB_NEXT, |
| rocksdb_stats->getTickerCount(rocksdb::Tickers::NUMBER_DB_NEXT)); |
| stats.TrackInstantaneousMetric(STATS_METRIC_ROCKSDB_PREV, |
| rocksdb_stats->getTickerCount(rocksdb::Tickers::NUMBER_DB_PREV)); |
| } |
| |
| void Server::cron() { |
| uint64_t counter = 0; |
| while (!stop_) { |
| // Sleep first |
| std::this_thread::sleep_for(std::chrono::milliseconds(100)); |
| |
| // To guarantee accessing DB safely |
| auto guard = storage->ReadLockGuard(); |
| if (storage->IsClosing()) continue; |
| |
| updateCachedTime(); |
| counter++; |
| |
| if (is_loading_) { |
| // We need to skip the cron operations since `is_loading_` means the db is restoring, |
| // and the db pointer will be modified after that. It will panic if we use the db pointer |
| // before the new db was reopened. |
| continue; |
| } |
| |
| // check every 20s (use 20s instead of 60s so that cron will execute in critical condition) |
| if (counter != 0 && counter % 200 == 0) { |
| auto t = static_cast<time_t>(util::GetTimeStamp()); |
| std::tm now{}; |
| localtime_r(&t, &now); |
| // disable compaction cron when the compaction checker was enabled |
| if (!config_->compaction_checker_cron.IsEnabled() && config_->compact_cron.IsEnabled() && |
| config_->compact_cron.IsTimeMatch(&now)) { |
| Status s = AsyncCompactDB(); |
| info("[server] Schedule to compact the db, result: {}", s.Msg()); |
| } |
| if (config_->bgsave_cron.IsEnabled() && config_->bgsave_cron.IsTimeMatch(&now)) { |
| Status s = AsyncBgSaveDB(); |
| info("[server] Schedule to bgsave the db, result: {}", s.Msg()); |
| } |
| if (config_->dbsize_scan_cron.IsEnabled() && config_->dbsize_scan_cron.IsTimeMatch(&now)) { |
| auto tokens = namespace_.List(); |
| std::vector<std::string> namespaces; |
| |
| // Number of namespaces (custom namespaces + default one) |
| namespaces.reserve(tokens.size() + 1); |
| for (auto &token : tokens) { |
| namespaces.emplace_back(token.second); // namespace |
| } |
| |
| // add default namespace as fallback |
| namespaces.emplace_back(kDefaultNamespace); |
| |
| for (auto &ns : namespaces) { |
| Status s = AsyncScanDBSize(ns); |
| info("[server] Schedule to recalculate the db size on namespace: {}, result: {}", ns, s.Msg()); |
| } |
| } |
| } |
| // check every 10s |
| if (counter != 0 && counter % 100 == 0) { |
| Status s = AsyncPurgeOldBackups(config_->max_backup_to_keep, config_->max_backup_keep_hours); |
| |
| // Purge backup if needed, it will cost much disk space if we keep backup and full sync |
| // checkpoints at the same time |
| if (config_->purge_backup_on_fullsync && (storage->ExistCheckpoint() || storage->ExistSyncCheckpoint())) { |
| s = AsyncPurgeOldBackups(0, 0); |
| } |
| } |
| |
| // No replica uses this checkpoint, we can remove it. |
| if (counter != 0 && counter % 100 == 0) { |
| int64_t create_time_secs = storage->GetCheckpointCreateTimeSecs(); |
| int64_t access_time_secs = storage->GetCheckpointAccessTimeSecs(); |
| |
| if (storage->ExistCheckpoint()) { |
| // TODO(shooterit): support to config the alive time of checkpoint |
| int64_t now_secs = util::GetTimeStamp<std::chrono::seconds>(); |
| if ((GetFetchFileThreadNum() == 0 && now_secs - access_time_secs > 30) || |
| (now_secs - create_time_secs > 24 * 60 * 60)) { |
| auto s = rocksdb::DestroyDB(config_->checkpoint_dir, rocksdb::Options()); |
| if (!s.ok()) { |
| warn("[server] Fail to clean checkpoint, error: {}", s.ToString()); |
| } else { |
| info("[server] Clean checkpoint successfully"); |
| } |
| } |
| } |
| } |
| // check if DB need to be resumed every minute |
| // Rocksdb has auto resume feature after retryable io error, earlier version(before v6.22.1) had |
| // bug when encounter no space error. The current version fixes the no space error issue, but it |
| // does not completely resolve, which still exists when encountered disk quota exceeded error. |
| // In order to properly handle all possible situations on rocksdb, we manually resume here |
| // when encountering no space error and disk quota exceeded error. |
| if (counter != 0 && counter % 600 == 0 && storage->IsDBInRetryableIOError()) { |
| auto s = storage->GetDB()->Resume(); |
| if (s.ok()) { |
| warn("[server] Successfully resumed DB after retryable IO error"); |
| } else { |
| error("[server] Failed to resume DB after retryable IO error: {}", s.ToString()); |
| } |
| storage->SetDBInRetryableIOError(false); |
| } |
| |
| // check if we need to clean up exited worker threads every 5s |
| if (counter != 0 && counter % 50 == 0) { |
| cleanupExitedWorkerThreads(false); |
| } |
| |
| CleanupExitedSlaves(); |
| recordInstantaneousMetrics(); |
| } |
| } |
| |
| Server::InfoEntries Server::GetRocksDBInfo() { |
| InfoEntries entries; |
| if (is_loading_) return entries; |
| |
| rocksdb::DB *db = storage->GetDB(); |
| |
| uint64_t memtable_sizes = 0, cur_memtable_sizes = 0, num_snapshots = 0, num_running_flushes = 0; |
| uint64_t num_immutable_tables = 0, memtable_flush_pending = 0, compaction_pending = 0; |
| uint64_t num_running_compaction = 0, num_live_versions = 0, num_super_version = 0, num_background_errors = 0; |
| |
| db->GetAggregatedIntProperty(rocksdb::DB::Properties::kSizeAllMemTables, &memtable_sizes); |
| db->GetAggregatedIntProperty(rocksdb::DB::Properties::kCurSizeAllMemTables, &cur_memtable_sizes); |
| db->GetAggregatedIntProperty(rocksdb::DB::Properties::kNumImmutableMemTable, &num_immutable_tables); |
| db->GetAggregatedIntProperty(rocksdb::DB::Properties::kMemTableFlushPending, &memtable_flush_pending); |
| db->GetAggregatedIntProperty(rocksdb::DB::Properties::kCurrentSuperVersionNumber, &num_super_version); |
| db->GetAggregatedIntProperty(rocksdb::DB::Properties::kBackgroundErrors, &num_background_errors); |
| db->GetAggregatedIntProperty(rocksdb::DB::Properties::kCompactionPending, &compaction_pending); |
| db->GetAggregatedIntProperty(rocksdb::DB::Properties::kNumLiveVersions, &num_live_versions); |
| |
| { |
| // All column families share the same block cache, so it's good to count a single one. |
| uint64_t block_cache_usage = 0; |
| uint64_t block_cache_pinned_usage = 0; |
| auto subkey_cf_handle = storage->GetCFHandle(ColumnFamilyID::PrimarySubkey); |
| db->GetIntProperty(subkey_cf_handle, rocksdb::DB::Properties::kBlockCacheUsage, &block_cache_usage); |
| entries.emplace_back("block_cache_usage", block_cache_usage); |
| db->GetIntProperty(subkey_cf_handle, rocksdb::DB::Properties::kBlockCachePinnedUsage, &block_cache_pinned_usage); |
| entries.emplace_back("block_cache_pinned_usage[" + subkey_cf_handle->GetName() + "]", block_cache_pinned_usage); |
| |
| // All column faimilies share the same property of the DB, so it's good to count a single one. |
| db->GetIntProperty(subkey_cf_handle, rocksdb::DB::Properties::kNumSnapshots, &num_snapshots); |
| db->GetIntProperty(subkey_cf_handle, rocksdb::DB::Properties::kNumRunningFlushes, &num_running_flushes); |
| db->GetIntProperty(subkey_cf_handle, rocksdb::DB::Properties::kNumRunningCompactions, &num_running_compaction); |
| } |
| |
| for (const auto &cf_handle : *storage->GetCFHandles()) { |
| uint64_t estimate_keys = 0; |
| uint64_t index_and_filter_cache_usage = 0; |
| std::map<std::string, std::string> cf_stats_map; |
| db->GetIntProperty(cf_handle, rocksdb::DB::Properties::kEstimateNumKeys, &estimate_keys); |
| entries.emplace_back("estimate_keys[" + cf_handle->GetName() + "]", estimate_keys); |
| db->GetIntProperty(cf_handle, rocksdb::DB::Properties::kEstimateTableReadersMem, &index_and_filter_cache_usage); |
| entries.emplace_back("index_and_filter_cache_usage[" + cf_handle->GetName() + "]", index_and_filter_cache_usage); |
| db->GetMapProperty(cf_handle, rocksdb::DB::Properties::kCFStats, &cf_stats_map); |
| entries.emplace_back("level0_file_limit_slowdown[" + cf_handle->GetName() + "]", |
| cf_stats_map["l0-file-count-limit-delays"]); |
| entries.emplace_back("level0_file_limit_stop[" + cf_handle->GetName() + "]", |
| cf_stats_map["l0-file-count-limit-stops"]); |
| entries.emplace_back("pending_compaction_bytes_slowdown[" + cf_handle->GetName() + "]", |
| cf_stats_map["pending-compaction-bytes-delays"]); |
| entries.emplace_back("pending_compaction_bytes_stop[" + cf_handle->GetName() + "]", |
| cf_stats_map["pending-compaction-bytes-stops"]); |
| entries.emplace_back("level0_file_limit_stop_with_ongoing_compaction[" + cf_handle->GetName() + "]", |
| cf_stats_map["cf-l0-file-count-limit-stops-with-ongoing-compaction"]); |
| entries.emplace_back("level0_file_limit_slowdown_with_ongoing_compaction[" + cf_handle->GetName() + "]", |
| cf_stats_map["cf-l0-file-count-limit-delays-with-ongoing-compaction"]); |
| entries.emplace_back("memtable_count_limit_slowdown[" + cf_handle->GetName() + "]", |
| cf_stats_map["memtable-limit-delays"]); |
| entries.emplace_back("memtable_count_limit_stop[" + cf_handle->GetName() + "]", |
| cf_stats_map["memtable-limit-stops"]); |
| } |
| |
| auto rocksdb_stats = storage->GetDB()->GetDBOptions().statistics; |
| if (rocksdb_stats) { |
| std::map<std::string, uint32_t> block_cache_stats = { |
| {"block_cache_hit", rocksdb::Tickers::BLOCK_CACHE_HIT}, |
| {"block_cache_index_hit", rocksdb::Tickers::BLOCK_CACHE_INDEX_HIT}, |
| {"block_cache_filter_hit", rocksdb::Tickers::BLOCK_CACHE_FILTER_HIT}, |
| {"block_cache_data_hit", rocksdb::Tickers::BLOCK_CACHE_DATA_HIT}, |
| {"block_cache_miss", rocksdb::Tickers::BLOCK_CACHE_MISS}, |
| {"block_cache_index_miss", rocksdb::Tickers::BLOCK_CACHE_INDEX_MISS}, |
| {"block_cache_filter_miss", rocksdb::Tickers::BLOCK_CACHE_FILTER_MISS}, |
| {"block_cache_data_miss", rocksdb::Tickers::BLOCK_CACHE_DATA_MISS}, |
| }; |
| for (const auto &iter : block_cache_stats) { |
| entries.emplace_back(iter.first, rocksdb_stats->getTickerCount(iter.second)); |
| } |
| } |
| |
| entries.emplace_back("all_mem_tables", memtable_sizes); |
| entries.emplace_back("cur_mem_tables", cur_memtable_sizes); |
| entries.emplace_back("snapshots", num_snapshots); |
| entries.emplace_back("num_immutable_tables", num_immutable_tables); |
| entries.emplace_back("num_running_flushes", num_running_flushes); |
| entries.emplace_back("memtable_flush_pending", memtable_flush_pending); |
| entries.emplace_back("compaction_pending", compaction_pending); |
| entries.emplace_back("num_running_compactions", num_running_compaction); |
| entries.emplace_back("num_live_versions", num_live_versions); |
| entries.emplace_back("num_super_version", num_super_version); |
| entries.emplace_back("num_background_errors", num_background_errors); |
| auto db_stats = storage->GetDBStats(); |
| entries.emplace_back("flush_count", db_stats->flush_count.load()); |
| entries.emplace_back("compaction_count", db_stats->compaction_count.load()); |
| entries.emplace_back("put_per_sec", stats.GetInstantaneousMetric(STATS_METRIC_ROCKSDB_PUT)); |
| entries.emplace_back("get_per_sec", stats.GetInstantaneousMetric(STATS_METRIC_ROCKSDB_GET) + |
| stats.GetInstantaneousMetric(STATS_METRIC_ROCKSDB_MULTIGET)); |
| entries.emplace_back("seek_per_sec", stats.GetInstantaneousMetric(STATS_METRIC_ROCKSDB_SEEK)); |
| entries.emplace_back("next_per_sec", stats.GetInstantaneousMetric(STATS_METRIC_ROCKSDB_NEXT)); |
| entries.emplace_back("prev_per_sec", stats.GetInstantaneousMetric(STATS_METRIC_ROCKSDB_PREV)); |
| db_job_mu_.lock(); |
| entries.emplace_back("is_bgsaving", (is_bgsave_in_progress_ ? "yes" : "no")); |
| entries.emplace_back("is_compacting", (db_compacting_ ? "yes" : "no")); |
| db_job_mu_.unlock(); |
| |
| return entries; |
| } |
| |
| Server::InfoEntries Server::GetServerInfo() { |
| static int call_uname = 1; |
| static utsname name; |
| if (call_uname) { |
| /* Uname can be slow and is always the same output. Cache it. */ |
| uname(&name); |
| call_uname = 0; |
| } |
| |
| Server::InfoEntries entries; |
| entries.emplace_back("version", VERSION); // deprecated |
| entries.emplace_back("kvrocks_version", VERSION); |
| entries.emplace_back("redis_version", REDIS_VERSION); |
| entries.emplace_back("git_sha1", GIT_COMMIT); // deprecated |
| entries.emplace_back("kvrocks_git_sha1", GIT_COMMIT); |
| entries.emplace_back("redis_mode", (config_->cluster_enabled ? "cluster" : "standalone")); |
| entries.emplace_back("kvrocks_mode", (config_->cluster_enabled ? "cluster" : "standalone")); |
| entries.emplace_back("os", fmt::format("{} {} {}", name.sysname, name.release, name.machine)); |
| #if defined(__GNUC__) && !defined(__clang__) |
| entries.emplace_back("gcc_version", fmt::format("{}.{}.{}", __GNUC__, __GNUC_MINOR__, __GNUC_PATCHLEVEL__)); |
| #endif |
| #ifdef __clang__ |
| entries.emplace_back("clang_version", |
| fmt::format("{}.{}.{}", __clang_major__, __clang_minor__, __clang_patchlevel__)); |
| #endif |
| entries.emplace_back("rocksdb_version", fmt::format("{}.{}.{}", ROCKSDB_MAJOR, ROCKSDB_MINOR, ROCKSDB_PATCH)); |
| entries.emplace_back("arch_bits", sizeof(void *) * 8); |
| entries.emplace_back("process_id", getpid()); |
| entries.emplace_back("tcp_port", config_->port); |
| entries.emplace_back("server_time_usec", util::GetTimeStampUS()); |
| int64_t now_secs = util::GetTimeStamp<std::chrono::seconds>(); |
| entries.emplace_back("uptime_in_seconds", now_secs - start_time_secs_); |
| entries.emplace_back("uptime_in_days", (now_secs - start_time_secs_) / 86400); |
| #ifdef __linux__ |
| if (auto exec_path = realpath("/proc/self/exe", nullptr)) { |
| entries.emplace_back("executable", exec_path); |
| free(exec_path); // NOLINT(cppcoreguidelines-no-malloc) |
| } |
| #endif |
| entries.emplace_back("config_file", config_->ConfigFilePath()); |
| return entries; |
| } |
| |
| Server::InfoEntries Server::GetClientsInfo() { |
| InfoEntries entries; |
| entries.emplace_back("maxclients", config_->maxclients); |
| entries.emplace_back("connected_clients", connected_clients_.load()); |
| entries.emplace_back("monitor_clients", monitor_clients_.load()); |
| entries.emplace_back("blocked_clients", blocked_clients_.load()); |
| return entries; |
| } |
| |
| Server::InfoEntries Server::GetMemoryInfo() { |
| int64_t rss = Stats::GetMemoryRSS(); |
| int64_t memory_lua = 0; |
| for (auto &wt : worker_threads_) { |
| memory_lua += wt->GetWorker()->GetLuaMemorySize(); |
| } |
| std::string used_memory_rss_human = util::BytesToHuman(rss); |
| std::string used_memory_lua_human = util::BytesToHuman(memory_lua); |
| |
| InfoEntries entries; |
| entries.emplace_back("used_memory_rss", rss); |
| entries.emplace_back("used_memory_rss_human", used_memory_rss_human); |
| entries.emplace_back("used_memory_lua", memory_lua); |
| entries.emplace_back("used_memory_lua_human", used_memory_lua_human); |
| entries.emplace_back("used_memory_startup", memory_startup_use_.load(std::memory_order_relaxed)); |
| entries.emplace_back("mem_allocator", memory_profiler.AllocatorName()); |
| return entries; |
| } |
| |
| Server::InfoEntries Server::GetReplicationInfo() { |
| InfoEntries entries; |
| if (is_loading_) return entries; |
| |
| entries.emplace_back("role", (IsSlave() ? "slave" : "master")); |
| if (IsSlave()) { |
| int64_t now_secs = util::GetTimeStamp<std::chrono::seconds>(); |
| entries.emplace_back("master_host", master_host_); |
| entries.emplace_back("master_port", master_port_); |
| ReplState state = GetReplicationState(); |
| entries.emplace_back("master_link_status", (state == kReplConnected ? "up" : "down")); |
| entries.emplace_back("master_sync_unrecoverable_error", (state == kReplError ? "yes" : "no")); |
| entries.emplace_back("master_sync_in_progress", (state == kReplFetchMeta || state == kReplFetchSST)); |
| entries.emplace_back("master_last_io_seconds_ago", now_secs - replication_thread_->LastIOTimeSecs()); |
| entries.emplace_back("slave_repl_offset", storage->LatestSeqNumber()); |
| entries.emplace_back("slave_priority", config_->slave_priority); |
| } |
| |
| int idx = 0; |
| rocksdb::SequenceNumber latest_seq = storage->LatestSeqNumber(); |
| |
| slave_threads_mu_.lock(); |
| entries.emplace_back("connected_slaves", slave_threads_.size()); |
| for (const auto &slave : slave_threads_) { |
| if (slave->IsStopped()) continue; |
| |
| entries.emplace_back("slave" + std::to_string(idx), |
| fmt::format("ip={},port={},offset={},lag={}", slave->GetConn()->GetAnnounceIP(), |
| slave->GetConn()->GetAnnouncePort(), slave->GetCurrentReplSeq(), |
| latest_seq - slave->GetCurrentReplSeq())); |
| ++idx; |
| } |
| slave_threads_mu_.unlock(); |
| |
| entries.emplace_back("master_repl_offset", latest_seq); |
| |
| return entries; |
| } |
| |
| std::string Server::GetRoleInfo() { |
| if (IsSlave()) { |
| std::vector<std::string> roles; |
| roles.emplace_back("slave"); |
| roles.emplace_back(master_host_); |
| roles.emplace_back(std::to_string(master_port_)); |
| |
| auto state = GetReplicationState(); |
| if (state == kReplConnected) { |
| roles.emplace_back("connected"); |
| } else if (state == kReplFetchMeta || state == kReplFetchSST) { |
| roles.emplace_back("sync"); |
| } else { |
| roles.emplace_back("connecting"); |
| } |
| roles.emplace_back(std::to_string(storage->LatestSeqNumber())); |
| return redis::ArrayOfBulkStrings(roles); |
| } else { |
| std::vector<std::string> list; |
| |
| slave_threads_mu_.lock(); |
| for (const auto &slave : slave_threads_) { |
| if (slave->IsStopped()) continue; |
| |
| list.emplace_back(redis::ArrayOfBulkStrings({ |
| slave->GetConn()->GetAnnounceIP(), |
| std::to_string(slave->GetConn()->GetListeningPort()), |
| std::to_string(slave->GetCurrentReplSeq()), |
| })); |
| } |
| slave_threads_mu_.unlock(); |
| |
| auto multi_len = 2; |
| if (list.size() > 0) { |
| multi_len = 3; |
| } |
| std::string info; |
| info.append(redis::MultiLen(multi_len)); |
| info.append(redis::BulkString("master")); |
| info.append(redis::BulkString(std::to_string(storage->LatestSeqNumber()))); |
| if (list.size() > 0) { |
| info.append(redis::Array(list)); |
| } |
| return info; |
| } |
| } |
| |
| std::string Server::GetLastRandomKeyCursor() { |
| std::string cursor; |
| std::lock_guard<std::mutex> guard(last_random_key_cursor_mu_); |
| cursor = last_random_key_cursor_; |
| return cursor; |
| } |
| |
| void Server::SetLastRandomKeyCursor(const std::string &cursor) { |
| std::lock_guard<std::mutex> guard(last_random_key_cursor_mu_); |
| last_random_key_cursor_ = cursor; |
| } |
| |
| int64_t Server::GetCachedUnixTime() { |
| if (unix_time_secs.load() == 0) { |
| updateCachedTime(); |
| } |
| return unix_time_secs.load(); |
| } |
| |
| int64_t Server::GetLastBgsaveTime() { |
| std::lock_guard<std::mutex> lg(db_job_mu_); |
| return last_bgsave_timestamp_secs_ == -1 ? start_time_secs_ : last_bgsave_timestamp_secs_; |
| } |
| |
| Server::InfoEntries Server::GetStatsInfo() { |
| Server::InfoEntries entries; |
| entries.emplace_back("total_connections_received", total_clients_.load()); |
| entries.emplace_back("total_commands_processed", stats.total_calls.load()); |
| entries.emplace_back("instantaneous_ops_per_sec", stats.GetInstantaneousMetric(STATS_METRIC_COMMAND)); |
| entries.emplace_back("total_net_input_bytes", stats.in_bytes.load()); |
| entries.emplace_back("total_net_output_bytes", stats.out_bytes.load()); |
| entries.emplace_back("instantaneous_input_kbps", |
| static_cast<float>(stats.GetInstantaneousMetric(STATS_METRIC_NET_INPUT) / 1024)); |
| entries.emplace_back("instantaneous_output_kbps", |
| static_cast<float>(stats.GetInstantaneousMetric(STATS_METRIC_NET_OUTPUT) / 1024)); |
| entries.emplace_back("sync_full", stats.fullsync_count.load()); |
| entries.emplace_back("sync_partial_ok", stats.psync_ok_count.load()); |
| entries.emplace_back("sync_partial_err", stats.psync_err_count.load()); |
| |
| auto db_stats = storage->GetDBStats(); |
| entries.emplace_back("keyspace_hits", db_stats->keyspace_hits.load()); |
| entries.emplace_back("keyspace_misses", db_stats->keyspace_misses.load()); |
| |
| { |
| std::lock_guard<std::mutex> lg(pubsub_channels_mu_); |
| entries.emplace_back("pubsub_channels", pubsub_channels_.size()); |
| entries.emplace_back("pubsub_patterns", pubsub_patterns_.size()); |
| } |
| |
| return entries; |
| } |
| |
| Server::InfoEntries Server::GetCommandsStatsInfo() { |
| InfoEntries entries; |
| |
| for (const auto &cmd_stat : stats.commands_stats) { |
| auto calls = cmd_stat.second.calls.load(); |
| if (calls == 0) continue; |
| |
| auto latency = cmd_stat.second.latency.load(); |
| entries.emplace_back("cmdstat_" + cmd_stat.first, |
| fmt::format("calls={},usec={},usec_per_call={}", calls, latency, |
| static_cast<double>(latency) / static_cast<double>(calls))); |
| } |
| |
| for (const auto &cmd_hist : stats.commands_histogram) { |
| auto command_name = cmd_hist.first; |
| auto calls = stats.commands_histogram[command_name].calls.load(); |
| if (calls == 0) continue; |
| |
| auto sum = stats.commands_histogram[command_name].sum.load(); |
| std::string result; |
| for (std::size_t i{0}; i < stats.commands_histogram[command_name].buckets.size(); ++i) { |
| auto bucket_value = stats.commands_histogram[command_name].buckets[i]->load(); |
| auto bucket_bound = std::numeric_limits<double>::infinity(); |
| if (i < stats.bucket_boundaries.size()) { |
| bucket_bound = stats.bucket_boundaries[i]; |
| } |
| |
| result.append(fmt::format("{}={},", bucket_bound, bucket_value)); |
| } |
| result.append(fmt::format("sum={},count={}", sum, calls)); |
| |
| entries.emplace_back("cmdstathist_" + command_name, result); |
| } |
| |
| return entries; |
| } |
| |
| Server::InfoEntries Server::GetClusterInfo() { |
| InfoEntries entries; |
| |
| entries.emplace_back("cluster_enabled", config_->cluster_enabled); |
| |
| return entries; |
| } |
| |
| Server::InfoEntries Server::GetPersistenceInfo() { |
| InfoEntries entries; |
| |
| entries.emplace_back("loading", is_loading_.load()); |
| |
| std::lock_guard<std::mutex> lg(db_job_mu_); |
| entries.emplace_back("bgsave_in_progress", is_bgsave_in_progress_); |
| entries.emplace_back("last_bgsave_time", |
| (last_bgsave_timestamp_secs_ == -1 ? start_time_secs_ : last_bgsave_timestamp_secs_)); |
| entries.emplace_back("last_bgsave_status", last_bgsave_status_); |
| entries.emplace_back("last_bgsave_time_sec", last_bgsave_duration_secs_); |
| |
| return entries; |
| } |
| |
| Server::InfoEntries Server::GetCpuInfo() { // NOLINT(readability-convert-member-functions-to-static) |
| InfoEntries entries; |
| |
| rusage self_ru; |
| getrusage(RUSAGE_SELF, &self_ru); |
| entries.emplace_back("used_cpu_sys", static_cast<float>(self_ru.ru_stime.tv_sec) + |
| static_cast<float>(self_ru.ru_stime.tv_usec / 1000000)); |
| entries.emplace_back("used_cpu_user", static_cast<float>(self_ru.ru_utime.tv_sec) + |
| static_cast<float>(self_ru.ru_utime.tv_usec / 1000000)); |
| |
| return entries; |
| } |
| |
| Server::InfoEntries Server::GetKeyspaceInfo(const std::string &ns) { |
| InfoEntries entries; |
| if (is_loading_) return entries; |
| |
| KeyNumStats stats; |
| GetLatestKeyNumStats(ns, &stats); |
| auto last_dbsize_scan_timestamp = static_cast<time_t>(GetLastScanTime(ns)); |
| |
| entries.emplace_back("last_dbsize_scan_timestamp", last_dbsize_scan_timestamp); |
| entries.emplace_back("db0", fmt::format("keys={},expires={},avg_ttl={},expired={}", stats.n_key, stats.n_expires, |
| stats.avg_ttl, stats.n_expired)); |
| entries.emplace_back("sequence", storage->GetDB()->GetLatestSequenceNumber()); |
| entries.emplace_back("used_db_size", storage->GetTotalSize(ns)); |
| entries.emplace_back("max_db_size", config_->max_db_size * GiB); |
| double used_percent = config_->max_db_size ? static_cast<double>(storage->GetTotalSize() * 100) / |
| static_cast<double>(config_->max_db_size * GiB) |
| : 0; |
| entries.emplace_back("used_percent", fmt::format("{}%", used_percent)); |
| |
| struct statvfs stat; |
| if (statvfs(config_->db_dir.c_str(), &stat) == 0) { |
| auto disk_capacity = stat.f_blocks * stat.f_frsize; |
| auto used_disk_size = (stat.f_blocks - stat.f_bavail) * stat.f_frsize; |
| entries.emplace_back("disk_capacity", disk_capacity); |
| entries.emplace_back("used_disk_size", used_disk_size); |
| double used_disk_percent = static_cast<double>(used_disk_size * 100) / static_cast<double>(disk_capacity); |
| entries.emplace_back("used_disk_percent", fmt::format("{}%", used_disk_percent)); |
| } |
| |
| return entries; |
| } |
| |
| // WARNING: we must not access DB(i.e. RocksDB) when server is loading since |
| // DB is closed and the pointer is invalid. Server may crash if we access DB during loading. |
| // If you add new fields which access DB into INFO command output, make sure |
| // this section can't be shown when loading(i.e. !is_loading_). |
| std::string Server::GetInfo(const std::string &ns, const std::vector<std::string> §ions) { |
| std::vector<std::pair<std::string, std::function<InfoEntries(Server *)>>> info_funcs = { |
| {"Server", &Server::GetServerInfo}, {"Clients", &Server::GetClientsInfo}, |
| {"Memory", &Server::GetMemoryInfo}, {"Persistence", &Server::GetPersistenceInfo}, |
| {"Stats", &Server::GetStatsInfo}, {"Replication", &Server::GetReplicationInfo}, |
| {"CPU", &Server::GetCpuInfo}, {"CommandStats", &Server::GetCommandsStatsInfo}, |
| {"Cluster", &Server::GetClusterInfo}, {"Keyspace", [&ns](Server *srv) { return srv->GetKeyspaceInfo(ns); }}, |
| {"RocksDB", &Server::GetRocksDBInfo}, |
| }; |
| |
| std::string info_str; |
| |
| bool all = sections.empty() || util::FindICase(sections.begin(), sections.end(), "all") != sections.end(); |
| |
| bool first = true; |
| for (const auto &[sec, fn] : info_funcs) { |
| if (all || util::FindICase(sections.begin(), sections.end(), sec) != sections.end()) { |
| if (first) |
| first = false; |
| else |
| info_str.append("\r\n"); |
| |
| info_str.append("# " + sec + "\r\n"); |
| |
| for (const auto &entry : fn(this)) { |
| info_str.append(fmt::format("{}:{}\r\n", entry.name, entry.val)); |
| } |
| } |
| } |
| |
| return info_str; |
| } |
| |
| std::string Server::GetRocksDBStatsJson() const { |
| jsoncons::json stats_json; |
| |
| auto stats = storage->GetDB()->GetDBOptions().statistics; |
| for (const auto &iter : rocksdb::TickersNameMap) { |
| stats_json[iter.second] = stats->getTickerCount(iter.first); |
| } |
| |
| for (const auto &iter : rocksdb::HistogramsNameMap) { |
| rocksdb::HistogramData hist_data; |
| stats->histogramData(iter.first, &hist_data); |
| /* P50 P95 P99 P100 COUNT SUM */ |
| stats_json[iter.second] = |
| jsoncons::json(jsoncons::json_array_arg, {hist_data.median, hist_data.percentile95, hist_data.percentile99, |
| hist_data.max, hist_data.count, hist_data.sum}); |
| } |
| |
| return stats_json.to_string(); |
| } |
| |
| // This function is called by replication thread when finished fetching all files from its master. |
| // Before restoring the db from backup or checkpoint, we should |
| // guarantee other threads don't access DB and its column families, then close db. |
| bool Server::PrepareRestoreDB() { |
| // Stop feeding slaves thread |
| info("[server] Disconnecting slaves..."); |
| DisconnectSlaves(); |
| |
| // If the DB is restored, the object 'db_' will be destroyed, but |
| // 'db_' will be accessed in data migration task. To avoid wrong |
| // accessing, data migration task should be stopped before restoring DB |
| WaitNoMigrateProcessing(); |
| |
| // Workers will disallow to run commands which may access DB, so we should |
| // enable this flag to stop workers from running new commands. And wait for |
| // the exclusive guard to be released to guarantee no worker is running. |
| is_loading_ = true; |
| |
| // To guarantee work threads don't access DB, we should release 'ExclusivityGuard' |
| // ASAP to avoid user can't receive responses for long time, because the following |
| // 'CloseDB' may cost much time to acquire DB mutex. |
| info("[server] Waiting workers for finishing executing commands..."); |
| while (!works_concurrency_rw_lock_.try_lock()) { |
| if (replication_thread_->IsStopped()) { |
| is_loading_ = false; |
| return false; |
| } |
| usleep(1000); |
| } |
| works_concurrency_rw_lock_.unlock(); |
| |
| // Stop task runner |
| info("[server] Stopping the task runner and clear task queue..."); |
| task_runner_.Cancel(); |
| if (auto s = task_runner_.Join(); !s) { |
| warn("[server] {}", s.Msg()); |
| } |
| |
| // Cron thread, compaction checker thread, full synchronization thread |
| // may always run in the background, we need to close db, so they don't actually work. |
| info("[server] Waiting for closing DB..."); |
| storage->CloseDB(); |
| return true; |
| } |
| |
| void Server::WaitNoMigrateProcessing() { |
| if (config_->cluster_enabled) { |
| info("[server] Waiting until no migration task is running..."); |
| slot_migrator->SetStopMigrationFlag(true); |
| while (slot_migrator->GetCurrentSlotMigrationStage() != SlotMigrationStage::kNone) { |
| usleep(500); |
| } |
| } |
| } |
| |
| Status Server::AsyncCompactDB(const std::string &begin_key, const std::string &end_key) { |
| if (is_loading_) { |
| return {Status::NotOK, "loading in-progress"}; |
| } |
| |
| std::lock_guard<std::mutex> lg(db_job_mu_); |
| if (db_compacting_) { |
| return {Status::NotOK, "compact in-progress"}; |
| } |
| |
| db_compacting_ = true; |
| |
| return task_runner_.TryPublish([begin_key, end_key, this] { |
| std::unique_ptr<Slice> begin = nullptr, end = nullptr; |
| if (!begin_key.empty()) begin = std::make_unique<Slice>(begin_key); |
| if (!end_key.empty()) end = std::make_unique<Slice>(end_key); |
| |
| auto s = storage->Compact(nullptr, begin.get(), end.get()); |
| if (!s.ok()) { |
| error("[task runner] Failed to do compaction: {}", s.ToString()); |
| } |
| |
| std::lock_guard<std::mutex> lg(db_job_mu_); |
| db_compacting_ = false; |
| }); |
| } |
| |
| Status Server::AsyncBgSaveDB() { |
| std::lock_guard<std::mutex> lg(db_job_mu_); |
| if (is_bgsave_in_progress_) { |
| return {Status::NotOK, "bgsave in-progress"}; |
| } |
| |
| is_bgsave_in_progress_ = true; |
| |
| return task_runner_.TryPublish([this] { |
| auto start_bgsave_time_secs = util::GetTimeStamp<std::chrono::seconds>(); |
| Status s = storage->CreateBackup(); |
| auto stop_bgsave_time_secs = util::GetTimeStamp<std::chrono::seconds>(); |
| |
| std::lock_guard<std::mutex> lg(db_job_mu_); |
| is_bgsave_in_progress_ = false; |
| last_bgsave_timestamp_secs_ = start_bgsave_time_secs; |
| last_bgsave_status_ = s.IsOK() ? "ok" : "err"; |
| last_bgsave_duration_secs_ = stop_bgsave_time_secs - start_bgsave_time_secs; |
| }); |
| } |
| |
| Status Server::AsyncPurgeOldBackups(uint32_t num_backups_to_keep, uint32_t backup_max_keep_hours) { |
| return task_runner_.TryPublish([num_backups_to_keep, backup_max_keep_hours, this] { |
| storage->PurgeOldBackups(num_backups_to_keep, backup_max_keep_hours); |
| }); |
| } |
| |
| Status Server::AsyncScanDBSize(const std::string &ns) { |
| std::lock_guard<std::mutex> lg(db_job_mu_); |
| |
| if (auto iter = db_scan_infos_.find(ns); iter == db_scan_infos_.end()) { |
| db_scan_infos_[ns] = DBScanInfo{}; |
| } |
| |
| if (db_scan_infos_[ns].is_scanning) { |
| return {Status::NotOK, "scanning the db now"}; |
| } |
| |
| db_scan_infos_[ns].is_scanning = true; |
| |
| return task_runner_.TryPublish([ns, this] { |
| redis::Database db(storage, ns); |
| |
| KeyNumStats stats; |
| engine::Context ctx(storage); |
| auto s = db.GetKeyNumStats(ctx, "", &stats); |
| if (!s.ok()) { |
| error("failed to retrieve key num stats: {}", s.ToString()); |
| } |
| |
| std::lock_guard<std::mutex> lg(db_job_mu_); |
| |
| db_scan_infos_[ns].key_num_stats = stats; |
| db_scan_infos_[ns].last_scan_time_secs = util::GetTimeStamp(); |
| db_scan_infos_[ns].is_scanning = false; |
| }); |
| } |
| |
| Status Server::autoResizeBlockAndSST() { |
| auto total_size = storage->GetTotalSize(kDefaultNamespace); |
| uint64_t total_keys = 0, estimate_keys = 0; |
| for (const auto &cf_handle : *storage->GetCFHandles()) { |
| storage->GetDB()->GetIntProperty(cf_handle, rocksdb::DB::Properties::kEstimateNumKeys, &estimate_keys); |
| total_keys += estimate_keys; |
| } |
| |
| if (total_size == 0 || total_keys == 0) { |
| return Status::OK(); |
| } |
| |
| auto average_kv_size = total_size / total_keys; |
| int target_file_size_base = 0; |
| int block_size = 0; |
| if (average_kv_size > 512 * KiB) { |
| target_file_size_base = 1024; |
| block_size = 1 * MiB; |
| } else if (average_kv_size > 256 * KiB) { |
| target_file_size_base = 512; |
| block_size = 512 * KiB; |
| } else if (average_kv_size > 32 * KiB) { |
| target_file_size_base = 256; |
| block_size = 256 * KiB; |
| } else if (average_kv_size > 1 * KiB) { |
| target_file_size_base = 128; |
| block_size = 32 * KiB; |
| } else if (average_kv_size > 128) { |
| target_file_size_base = 64; |
| block_size = 8 * KiB; |
| } else { |
| target_file_size_base = 16; |
| block_size = 2 * KiB; |
| } |
| |
| if (target_file_size_base == config_->rocks_db.target_file_size_base && |
| target_file_size_base == config_->rocks_db.write_buffer_size && block_size == config_->rocks_db.block_size) { |
| return Status::OK(); |
| } |
| |
| if (target_file_size_base != config_->rocks_db.target_file_size_base) { |
| auto old_target_file_size_base = config_->rocks_db.target_file_size_base; |
| auto s = config_->Set(this, "rocksdb.target_file_size_base", std::to_string(target_file_size_base)); |
| info( |
| "[server] Resize rocksdb.target_file_size_base from {} to {}, " |
| "average_kv_size: {}, total_size: {}, total_keys: {}, result: {}", |
| old_target_file_size_base, target_file_size_base, average_kv_size, total_size, total_keys, s.Msg()); |
| if (!s.IsOK()) { |
| return s; |
| } |
| } |
| |
| if (target_file_size_base != config_->rocks_db.write_buffer_size) { |
| auto old_write_buffer_size = config_->rocks_db.write_buffer_size; |
| auto s = config_->Set(this, "rocksdb.write_buffer_size", std::to_string(target_file_size_base)); |
| info( |
| "[server] Resize rocksdb.write_buffer_size from {} to {}, " |
| "average_kv_size: {}, total_size: {}, " |
| "total_keys: {}, result: {}", |
| old_write_buffer_size, target_file_size_base, average_kv_size, total_size, total_keys, s.Msg()); |
| if (!s.IsOK()) { |
| return s; |
| } |
| } |
| |
| if (block_size != config_->rocks_db.block_size) { |
| auto s = storage->SetOptionForAllColumnFamilies("table_factory.block_size", std::to_string(block_size)); |
| info( |
| "[server] Resize rocksdb.block_size from {} to {}, " |
| "average_kv_size: {}, total_size: {}, " |
| "total_keys: {}, result: {}", |
| config_->rocks_db.block_size, block_size, average_kv_size, total_size, total_keys, s.Msg()); |
| if (!s.IsOK()) { |
| return s; |
| } |
| |
| config_->rocks_db.block_size = block_size; |
| } |
| |
| auto s = config_->Rewrite(namespace_.List()); |
| info("[server] Rewrite config, result: {}", s.Msg()); |
| |
| return Status::OK(); |
| } |
| |
| void Server::GetLatestKeyNumStats(const std::string &ns, KeyNumStats *stats) { |
| auto iter = db_scan_infos_.find(ns); |
| if (iter != db_scan_infos_.end()) { |
| std::lock_guard<std::mutex> lg(db_job_mu_); |
| *stats = iter->second.key_num_stats; |
| } |
| } |
| |
| int64_t Server::GetLastScanTime(const std::string &ns) const { |
| auto iter = db_scan_infos_.find(ns); |
| if (iter != db_scan_infos_.end()) { |
| return iter->second.last_scan_time_secs; |
| } |
| return 0; |
| } |
| |
| StatusOr<std::vector<rocksdb::BatchResult>> Server::PollUpdates(uint64_t next_sequence, int64_t count, |
| bool is_strict) const { |
| std::vector<rocksdb::BatchResult> batches; |
| auto latest_sequence = storage->LatestSeqNumber(); |
| if (next_sequence == latest_sequence + 1) { |
| // return empty result if there is no new updates |
| return batches; |
| } else if (next_sequence > latest_sequence + 1) { |
| return {Status::NotOK, "next sequence is out of range"}; |
| } |
| |
| std::unique_ptr<rocksdb::TransactionLogIterator> iter; |
| if (auto s = storage->GetWALIter(next_sequence, &iter); !s.IsOK()) return s; |
| if (!iter) { |
| return Status{Status::NotOK, "unable to get WAL iterator"}; |
| } |
| |
| for (int64_t i = 0; i < count && iter->Valid() && iter->status().ok(); ++i, iter->Next()) { |
| // The first batch should have the same sequence number as the next sequence number |
| // if it requires strictly matched. |
| auto batch = iter->GetBatch(); |
| if (i == 0 && is_strict && batch.sequence != next_sequence) { |
| return {Status::NotOK, |
| fmt::format("mismatched sequence number, expected {} but got {}", next_sequence, batch.sequence)}; |
| } |
| batches.emplace_back(std::move(batch)); |
| } |
| return batches; |
| } |
| |
| void Server::SlowlogPushEntryIfNeeded(const std::vector<std::string> *args, uint64_t duration, |
| const redis::Connection *conn) { |
| int64_t threshold = config_->slowlog_log_slower_than; |
| if (threshold < 0 || static_cast<int64_t>(duration) < threshold) return; |
| |
| auto entry = std::make_unique<SlowEntry>(); |
| size_t argc = args->size() > kSlowLogMaxArgc ? kSlowLogMaxArgc : args->size(); |
| for (size_t i = 0; i < argc; i++) { |
| if (argc != args->size() && i == argc - 1) { |
| entry->args.emplace_back(fmt::format("... ({} more arguments)", args->size() - argc + 1)); |
| break; |
| } |
| |
| if ((*args)[i].length() <= kSlowLogMaxString) { |
| entry->args.emplace_back((*args)[i]); |
| } else { |
| entry->args.emplace_back(fmt::format("{}... ({} more bytes)", (*args)[i].substr(0, kSlowLogMaxString), |
| (*args)[i].length() - kSlowLogMaxString)); |
| } |
| } |
| |
| entry->duration = duration; |
| entry->client_name = conn->GetName(); |
| entry->ip = conn->GetIP(); |
| entry->port = conn->GetPort(); |
| slow_log_.PushEntry(std::move(entry)); |
| } |
| |
| std::string Server::GetClientsStr() { |
| std::string clients; |
| for (const auto &t : worker_threads_) { |
| clients.append(t->GetWorker()->GetClientsStr()); |
| } |
| |
| std::lock_guard<std::mutex> guard(slave_threads_mu_); |
| |
| for (const auto &st : slave_threads_) { |
| clients.append(st->GetConn()->ToString()); |
| } |
| |
| return clients; |
| } |
| |
| void Server::KillClient(int64_t *killed, const std::string &addr, uint64_t id, uint64_t type, bool skipme, |
| redis::Connection *conn) { |
| *killed = 0; |
| |
| // Normal clients and pubsub clients |
| for (const auto &t : worker_threads_) { |
| int64_t killed_in_worker = 0; |
| t->GetWorker()->KillClient(conn, id, addr, type, skipme, &killed_in_worker); |
| *killed += killed_in_worker; |
| } |
| |
| // Slave clients |
| slave_threads_mu_.lock(); |
| for (const auto &st : slave_threads_) { |
| if ((type & kTypeSlave) || |
| (!addr.empty() && (st->GetConn()->GetAddr() == addr || st->GetConn()->GetAnnounceAddr() == addr)) || |
| (id != 0 && st->GetConn()->GetID() == id)) { |
| st->Stop(); |
| (*killed)++; |
| } |
| } |
| slave_threads_mu_.unlock(); |
| |
| // Master client |
| if (IsSlave() && |
| (type & kTypeMaster || (!addr.empty() && addr == master_host_ + ":" + std::to_string(master_port_)))) { |
| // Stop replication thread and start a new one to replicate |
| if (auto s = AddMaster(master_host_, master_port_, true); !s.IsOK()) { |
| error("[server] Failed to add master {}:{} with error: {}", master_host_, master_port_, s.Msg()); |
| } |
| (*killed)++; |
| } |
| } |
| |
| ReplState Server::GetReplicationState() { |
| std::lock_guard<std::mutex> guard(slaveof_mu_); |
| if (IsSlave() && replication_thread_) { |
| return replication_thread_->State(); |
| } |
| return kReplConnecting; |
| } |
| |
| StatusOr<std::unique_ptr<redis::Commander>> Server::LookupAndCreateCommand(const std::string &cmd_name) { |
| if (cmd_name.empty()) return {Status::RedisUnknownCmd}; |
| |
| auto commands = redis::CommandTable::Get(); |
| auto cmd_iter = commands->find(util::ToLower(cmd_name)); |
| if (cmd_iter == commands->end()) { |
| return {Status::RedisUnknownCmd}; |
| } |
| |
| auto cmd_attr = cmd_iter->second; |
| auto cmd = cmd_attr->factory(); |
| cmd->SetAttributes(cmd_attr); |
| |
| return std::move(cmd); |
| } |
| |
| Status Server::ScriptExists(const std::string &sha) const { |
| std::string body; |
| return ScriptGet(sha, &body); |
| } |
| |
| Status Server::ScriptGet(const std::string &sha, std::string *body) const { |
| std::string func_name = engine::kLuaFuncSHAPrefix + sha; |
| auto cf = storage->GetCFHandle(ColumnFamilyID::Propagate); |
| engine::Context ctx(storage); |
| auto s = storage->Get(ctx, ctx.GetReadOptions(), cf, func_name, body); |
| if (!s.ok()) { |
| return {s.IsNotFound() ? Status::NotFound : Status::NotOK, s.ToString()}; |
| } |
| return Status::OK(); |
| } |
| |
| Status Server::ScriptSet(const std::string &sha, const std::string &body) const { |
| std::string func_name = engine::kLuaFuncSHAPrefix + sha; |
| engine::Context ctx(storage); |
| return storage->WriteToPropagateCF(ctx, func_name, body); |
| } |
| |
| Status Server::FunctionGetCode(const std::string &lib, std::string *code) const { |
| std::string func_name = engine::kLuaLibCodePrefix + lib; |
| auto cf = storage->GetCFHandle(ColumnFamilyID::Propagate); |
| engine::Context ctx(storage); |
| auto s = storage->Get(ctx, ctx.GetReadOptions(), cf, func_name, code); |
| if (!s.ok()) { |
| return {s.IsNotFound() ? Status::NotFound : Status::NotOK, s.ToString()}; |
| } |
| return Status::OK(); |
| } |
| |
| Status Server::FunctionGetLib(const std::string &func, std::string *lib) const { |
| std::string func_name = engine::kLuaFuncLibPrefix + func; |
| auto cf = storage->GetCFHandle(ColumnFamilyID::Propagate); |
| engine::Context ctx(storage); |
| auto s = storage->Get(ctx, ctx.GetReadOptions(), cf, func_name, lib); |
| if (!s.ok()) { |
| return {s.IsNotFound() ? Status::NotFound : Status::NotOK, s.ToString()}; |
| } |
| return Status::OK(); |
| } |
| |
| Status Server::FunctionSetCode(const std::string &lib, const std::string &code) const { |
| std::string func_name = engine::kLuaLibCodePrefix + lib; |
| engine::Context ctx(storage); |
| return storage->WriteToPropagateCF(ctx, func_name, code); |
| } |
| |
| Status Server::FunctionSetLib(const std::string &func, const std::string &lib) const { |
| std::string func_name = engine::kLuaFuncLibPrefix + func; |
| engine::Context ctx(storage); |
| return storage->WriteToPropagateCF(ctx, func_name, lib); |
| } |
| |
| void Server::ScriptReset() { |
| for (auto &wt : worker_threads_) { |
| wt->GetWorker()->LuaReset(); |
| } |
| } |
| |
| Status Server::ScriptFlush() { |
| auto cf = storage->GetCFHandle(ColumnFamilyID::Propagate); |
| engine::Context ctx(storage); |
| auto s = storage->FlushScripts(ctx, storage->DefaultWriteOptions(), cf); |
| if (!s.ok()) return {Status::NotOK, s.ToString()}; |
| ScriptReset(); |
| return Status::OK(); |
| } |
| |
| // Generally, we store data into RocksDB and just replicate WAL instead of propagating |
| // commands. But sometimes, we need to update inner states or do special operations |
| // for specific commands, such as `script flush`. |
| // channel: we put the same function commands into one channel to handle uniformly |
| // tokens: the serialized commands |
| Status Server::Propagate(const std::string &channel, const std::vector<std::string> &tokens) const { |
| std::string value = redis::MultiLen(tokens.size()); |
| for (const auto &iter : tokens) { |
| value += redis::BulkString(iter); |
| } |
| engine::Context ctx(storage); |
| return storage->WriteToPropagateCF(ctx, channel, value); |
| } |
| |
| Status Server::ExecPropagateScriptCommand(const std::vector<std::string> &tokens) { |
| auto subcommand = util::ToLower(tokens[1]); |
| if (subcommand == "flush") { |
| ScriptReset(); |
| } |
| return Status::OK(); |
| } |
| |
| Status Server::ExecPropagatedCommand(const std::vector<std::string> &tokens) { |
| if (tokens.empty()) return Status::OK(); |
| |
| auto command = util::ToLower(tokens[0]); |
| if (command == "script" && tokens.size() >= 2) { |
| return ExecPropagateScriptCommand(tokens); |
| } |
| |
| return Status::OK(); |
| } |
| |
| // AdjustOpenFilesLimit only try best to raise the max open files according to |
| // the max clients and RocksDB open file configuration. It also reserves a number |
| // of file descriptors(128) for extra operations of persistence, listening sockets, |
| // log files and so forth. |
| void Server::AdjustOpenFilesLimit() { |
| const int min_reserved_fds = 128; |
| auto rocksdb_max_open_file = static_cast<rlim_t>(config_->rocks_db.max_open_files); |
| auto max_clients = static_cast<rlim_t>(config_->maxclients); |
| auto max_files = max_clients + rocksdb_max_open_file + min_reserved_fds; |
| |
| rlimit limit; |
| if (getrlimit(RLIMIT_NOFILE, &limit) == -1) { |
| return; |
| } |
| |
| rlim_t old_limit = limit.rlim_cur; |
| // Set the max number of files only if the current limit is not enough |
| if (old_limit >= max_files) { |
| return; |
| } |
| |
| int setrlimit_error = 0; |
| rlim_t best_limit = max_files; |
| |
| while (best_limit > old_limit) { |
| limit.rlim_cur = best_limit; |
| limit.rlim_max = best_limit; |
| if (setrlimit(RLIMIT_NOFILE, &limit) != -1) break; |
| |
| setrlimit_error = errno; |
| |
| rlim_t decr_step = 16; |
| if (best_limit < decr_step) { |
| best_limit = old_limit; |
| break; |
| } |
| |
| best_limit -= decr_step; |
| } |
| |
| if (best_limit < old_limit) best_limit = old_limit; |
| |
| if (best_limit < max_files) { |
| if (best_limit <= static_cast<int>(min_reserved_fds)) { |
| warn( |
| "[server] Your current 'ulimit -n' of {} is not enough for the server to start. " |
| "Please increase your open file limit to at least {}. Exiting.", |
| old_limit, max_files); |
| exit(1); |
| } |
| |
| warn( |
| "[server] You requested max clients of {} and RocksDB max open files of {} " |
| "requiring at least {} max file descriptors.", |
| max_clients, rocksdb_max_open_file, max_files); |
| |
| warn( |
| "[server] Server can't set maximum open files to {} " |
| "because of OS error: {}", |
| max_files, strerror(setrlimit_error)); |
| } else { |
| warn("[server] Increased maximum number of open files to {} (it's originally set to {})", max_files, old_limit); |
| } |
| } |
| |
| void Server::AdjustWorkerThreads() { |
| auto new_worker_threads = static_cast<size_t>(config_->workers); |
| if (new_worker_threads == worker_threads_.size()) { |
| return; |
| } |
| size_t delta = 0; |
| if (new_worker_threads > worker_threads_.size()) { |
| delta = new_worker_threads - worker_threads_.size(); |
| increaseWorkerThreads(delta); |
| info("[server] Increase worker threads from {} to {}", worker_threads_.size(), new_worker_threads); |
| return; |
| } |
| |
| delta = worker_threads_.size() - new_worker_threads; |
| info("[server] Decrease worker threads from {} to {}", worker_threads_.size(), new_worker_threads); |
| decreaseWorkerThreads(delta); |
| } |
| |
| void Server::increaseWorkerThreads(size_t delta) { |
| for (size_t i = 0; i < delta; i++) { |
| auto worker = std::make_unique<Worker>(this, config_); |
| auto worker_thread = std::make_unique<WorkerThread>(std::move(worker)); |
| worker_thread->Start(); |
| worker_threads_.emplace_back(std::move(worker_thread)); |
| } |
| } |
| |
| void Server::decreaseWorkerThreads(size_t delta) { |
| auto current_worker_threads = worker_threads_.size(); |
| CHECK(current_worker_threads > delta); |
| auto remain_worker_threads = current_worker_threads - delta; |
| for (size_t i = remain_worker_threads; i < current_worker_threads; i++) { |
| // Unix socket will be listening on the first worker, |
| // so it MUST remove workers from the end of the vector. |
| // Otherwise, the unix socket will be closed. |
| auto worker_thread = std::move(worker_threads_.back()); |
| worker_threads_.pop_back(); |
| // Migrate connections to other workers before stopping the worker, |
| // we use round-robin to choose the target worker here. |
| auto connections = worker_thread->GetWorker()->GetConnections(); |
| for (const auto &iter : connections) { |
| auto target_worker = worker_threads_[iter.first % remain_worker_threads]->GetWorker(); |
| worker_thread->GetWorker()->MigrateConnection(target_worker, iter.second); |
| } |
| worker_thread->Stop(10 /* graceful timeout */); |
| // Don't join the worker thread here, because it may join itself. |
| recycle_worker_threads_.push(std::move(worker_thread)); |
| } |
| } |
| |
| void Server::cleanupExitedWorkerThreads(bool force) { |
| std::unique_ptr<WorkerThread> worker_thread = nullptr; |
| auto total = recycle_worker_threads_.unsafe_size(); |
| for (size_t i = 0; i < total; i++) { |
| if (!recycle_worker_threads_.try_pop(worker_thread)) { |
| break; |
| } |
| if (worker_thread->IsTerminated() || force) { |
| worker_thread->Join(); |
| worker_thread.reset(); |
| } else { |
| // Push the worker thread back to the queue if it's still running. |
| recycle_worker_threads_.push(std::move(worker_thread)); |
| } |
| } |
| } |
| |
| std::string ServerLogData::Encode() const { |
| if (type_ == kReplIdLog) { |
| return std::string(1, kReplIdTag) + " " + content_; |
| } |
| return content_; |
| } |
| |
| Status ServerLogData::Decode(const rocksdb::Slice &blob) { |
| if (blob.size() == 0) { |
| return {Status::NotOK}; |
| } |
| |
| const char *header = blob.data(); |
| // Only support `kReplIdTag` now |
| if (*header == kReplIdTag && blob.size() == 2 + kReplIdLength) { |
| type_ = kReplIdLog; |
| content_ = std::string(blob.data() + 2, blob.size() - 2); |
| return Status::OK(); |
| } |
| return {Status::NotOK}; |
| } |
| |
| void Server::updateWatchedKeysFromRange(const std::vector<std::string> &args, const redis::CommandKeyRange &range) { |
| std::shared_lock lock(watched_key_mutex_); |
| |
| for (size_t i = range.first_key; range.last_key > 0 ? i <= size_t(range.last_key) : i <= args.size() + range.last_key; |
| i += range.key_step) { |
| if (auto iter = watched_key_map_.find(args[i]); iter != watched_key_map_.end()) { |
| for (auto *conn : iter->second) { |
| conn->watched_keys_modified = true; |
| } |
| } |
| } |
| } |
| |
| void Server::updateAllWatchedKeys() { |
| std::shared_lock lock(watched_key_mutex_); |
| |
| for (auto &[_, conn_map] : watched_key_map_) { |
| for (auto *conn : conn_map) { |
| conn->watched_keys_modified = true; |
| } |
| } |
| } |
| |
| void Server::UpdateWatchedKeysFromArgs(const std::vector<std::string> &args, const redis::CommandAttributes &attr) { |
| if ((attr.GenerateFlags(args) & redis::kCmdWrite) && watched_key_size_ > 0) { |
| attr.ForEachKeyRange([this](const std::vector<std::string> &args, |
| redis::CommandKeyRange range) { updateWatchedKeysFromRange(args, range); }, |
| args, [this](const std::vector<std::string> &) { updateAllWatchedKeys(); }); |
| } |
| } |
| |
| void Server::UpdateWatchedKeysManually(const std::vector<std::string> &keys) { |
| std::shared_lock lock(watched_key_mutex_); |
| |
| for (const auto &key : keys) { |
| if (auto iter = watched_key_map_.find(key); iter != watched_key_map_.end()) { |
| for (auto *conn : iter->second) { |
| conn->watched_keys_modified = true; |
| } |
| } |
| } |
| } |
| |
| void Server::WatchKey(redis::Connection *conn, const std::vector<std::string> &keys) { |
| std::unique_lock lock(watched_key_mutex_); |
| |
| for (const auto &key : keys) { |
| if (auto iter = watched_key_map_.find(key); iter != watched_key_map_.end()) { |
| iter->second.emplace(conn); |
| } else { |
| watched_key_map_.emplace(key, std::set<redis::Connection *>{conn}); |
| } |
| |
| conn->watched_keys.insert(key); |
| } |
| |
| watched_key_size_ = watched_key_map_.size(); |
| } |
| |
| bool Server::IsWatchedKeysModified(redis::Connection *conn) { return conn->watched_keys_modified; } |
| |
| void Server::ResetWatchedKeys(redis::Connection *conn) { |
| if (watched_key_size_ != 0) { |
| std::unique_lock lock(watched_key_mutex_); |
| |
| for (const auto &key : conn->watched_keys) { |
| if (auto iter = watched_key_map_.find(key); iter != watched_key_map_.end()) { |
| iter->second.erase(conn); |
| |
| if (iter->second.empty()) { |
| watched_key_map_.erase(iter); |
| } |
| } |
| } |
| |
| conn->watched_keys.clear(); |
| conn->watched_keys_modified = false; |
| watched_key_size_ = watched_key_map_.size(); |
| } |
| } |
| |
| std::list<std::pair<std::string, uint32_t>> Server::GetSlaveHostAndPort() { |
| std::list<std::pair<std::string, uint32_t>> result; |
| slave_threads_mu_.lock(); |
| for (const auto &slave : slave_threads_) { |
| if (slave->IsStopped()) continue; |
| std::pair<std::string, int> host_port_pair = {slave->GetConn()->GetAnnounceIP(), |
| slave->GetConn()->GetListeningPort()}; |
| result.emplace_back(host_port_pair); |
| } |
| slave_threads_mu_.unlock(); |
| return result; |
| } |
| |
| // The numeric cursor consists of a 16-bit counter, a 16-bit time stamp, a 29-bit hash,and a 3-bit cursor type. The |
| // hash is used to prevent information leakage. The time_stamp is used to prevent the generation of the same cursor in |
| // the extremely short period before and after a restart. |
| NumberCursor::NumberCursor(CursorType cursor_type, uint16_t counter, const std::string &key_name) { |
| auto hash = static_cast<uint32_t>(std::hash<std::string>{}(key_name)); |
| auto time_stamp = static_cast<uint16_t>(util::GetTimeStamp()); |
| // make hash top 3-bit zero |
| constexpr uint64_t hash_mask = 0x1FFFFFFFFFFFFFFF; |
| cursor_ = static_cast<uint64_t>(counter) | static_cast<uint64_t>(time_stamp) << 16 | |
| (static_cast<uint64_t>(hash) << 32 & hash_mask) | static_cast<uint64_t>(cursor_type) << 61; |
| } |
| |
| bool NumberCursor::IsMatch(const CursorDictElement &element, CursorType cursor_type) const { |
| return cursor_ == element.cursor.cursor_ && cursor_type == getCursorType(); |
| } |
| |
| std::string Server::GenerateCursorFromKeyName(const std::string &key_name, CursorType cursor_type, const char *prefix) { |
| if (!config_->redis_cursor_compatible) { |
| // add prefix for SCAN |
| return prefix + key_name; |
| } |
| auto counter = cursor_counter_.fetch_add(1); |
| auto number_cursor = NumberCursor(cursor_type, counter, key_name); |
| cursor_dict_->at(number_cursor.GetIndex()) = {number_cursor, key_name}; |
| return number_cursor.ToString(); |
| } |
| |
| std::string Server::GetKeyNameFromCursor(const std::string &cursor, CursorType cursor_type) { |
| // When cursor is 0, cursor string is empty |
| if (cursor.empty() || !config_->redis_cursor_compatible) { |
| return cursor; |
| } |
| |
| auto s = ParseInt<uint64_t>(cursor, 10); |
| // When Cursor 0 or not a Integer return empty string. |
| // Although the parameter 'cursor' is not expected to be 0, we still added a check for 0 to increase the robustness of |
| // the code. |
| if (!s.IsOK() || *s == 0) { |
| return {}; |
| } |
| auto number_cursor = NumberCursor(*s); |
| // Because the index information is fully stored in the cursor, we can directly obtain the index from the cursor. |
| auto item = cursor_dict_->at(number_cursor.GetIndex()); |
| if (number_cursor.IsMatch(item, cursor_type)) { |
| return item.key_name; |
| } |
| |
| return {}; |
| } |
| |
| AuthResult Server::AuthenticateUser(const std::string &user_password, std::string *ns) { |
| const auto &requirepass = GetConfig()->requirepass; |
| if (requirepass.empty()) { |
| return AuthResult::NO_REQUIRE_PASS; |
| } |
| |
| auto get_ns = GetNamespace()->GetByToken(user_password); |
| if (get_ns.IsOK()) { |
| *ns = get_ns.GetValue(); |
| return AuthResult::IS_USER; |
| } |
| |
| if (user_password != requirepass) { |
| return AuthResult::INVALID_PASSWORD; |
| } |
| *ns = kDefaultNamespace; |
| return AuthResult::IS_ADMIN; |
| } |