| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| #include "worker.h" |
| |
| #include <event2/util.h> |
| #include <unistd.h> |
| |
| #include <cstdint> |
| #include <stdexcept> |
| #include <string> |
| |
| #include "event2/bufferevent.h" |
| #include "io_util.h" |
| #include "logging.h" |
| #include "scope_exit.h" |
| #include "thread_util.h" |
| |
| #ifdef ENABLE_OPENSSL |
| #include <event2/bufferevent_ssl.h> |
| #include <openssl/err.h> |
| #include <openssl/ssl.h> |
| #endif |
| |
| #include <sys/socket.h> |
| #include <sys/stat.h> |
| #include <sys/types.h> |
| #include <sys/un.h> |
| |
| #include <algorithm> |
| #include <utility> |
| |
| #include "redis_connection.h" |
| #include "redis_request.h" |
| #include "server.h" |
| #include "storage/scripting.h" |
| |
| Worker::Worker(Server *srv, Config *config) : srv(srv), base_(event_base_new()) { |
| if (!base_) throw std::runtime_error{"event base failed to be created"}; |
| |
| timer_.reset(NewEvent(base_, -1, EV_PERSIST)); |
| timeval tm = {10, 0}; |
| evtimer_add(timer_.get(), &tm); |
| |
| if (config->socket_fd != -1) { |
| if (const Status s = listenFD(config->socket_fd, config->port, config->backlog); !s.IsOK()) { |
| error("[worker] Failed to listen to socket with fd: {}, Error: {}", config->socket_fd, s.Msg()); |
| exit(1); |
| } |
| } else { |
| const uint32_t ports[3] = {config->port, config->tls_port, 0}; |
| |
| for (const uint32_t *port = ports; *port; ++port) { |
| for (const auto &bind : config->binds) { |
| if (const Status s = listenTCP(bind, *port, config->backlog); !s.IsOK()) { |
| error("[worker] Failed to listen on: {}:{}, Error: {}", bind, *port, s.Msg()); |
| exit(1); |
| } |
| info("[worker] Listening on: {}:{}", bind, *port); |
| } |
| } |
| } |
| lua_ = lua::CreateState(); |
| } |
| |
| Worker::~Worker() { |
| std::vector<redis::Connection *> conns; |
| conns.reserve(conns_.size() + monitor_conns_.size()); |
| |
| for (const auto &iter : conns_) { |
| conns.emplace_back(iter.second); |
| } |
| for (const auto &iter : monitor_conns_) { |
| conns.emplace_back(iter.second); |
| } |
| for (const auto &iter : conns) { |
| iter->Close(); |
| } |
| |
| timer_.reset(); |
| if (rate_limit_group_) { |
| bufferevent_rate_limit_group_free(rate_limit_group_); |
| } |
| if (rate_limit_group_cfg_) { |
| ev_token_bucket_cfg_free(rate_limit_group_cfg_); |
| } |
| event_base_free(base_); |
| lua::DestroyState(lua_); |
| } |
| |
| void Worker::TimerCB(int, [[maybe_unused]] int16_t events) { |
| auto config = srv->GetConfig(); |
| if (config->timeout == 0) return; |
| KickoutIdleClients(config->timeout); |
| } |
| |
| void Worker::newTCPConnection(evconnlistener *listener, evutil_socket_t fd, [[maybe_unused]] sockaddr *address, |
| [[maybe_unused]] int socklen) { |
| int local_port = util::GetLocalPort(fd); // NOLINT |
| debug("[worker] New connection: fd={} from port: {} thread #{}", fd, local_port, fmt::streamed(tid_)); |
| |
| auto s = util::SockSetTcpKeepalive(fd, 120); |
| if (!s.IsOK()) { |
| error("[worker] Failed to set tcp-keepalive on socket. Error: {}", s.Msg()); |
| evutil_closesocket(fd); |
| return; |
| } |
| |
| s = util::SockSetTcpNoDelay(fd, 1); |
| if (!s.IsOK()) { |
| error("[worker] Failed to set tcp-nodelay on socket. Error: {}", s.Msg()); |
| evutil_closesocket(fd); |
| return; |
| } |
| |
| event_base *base = evconnlistener_get_base(listener); |
| auto ev_thread_safe_flags = |
| BEV_OPT_THREADSAFE | BEV_OPT_DEFER_CALLBACKS | BEV_OPT_UNLOCK_CALLBACKS | BEV_OPT_CLOSE_ON_FREE; |
| |
| bufferevent *bev = nullptr; |
| ssl_st *ssl = nullptr; |
| #ifdef ENABLE_OPENSSL |
| if (uint32_t(local_port) == srv->GetConfig()->tls_port) { |
| ssl = SSL_new(srv->ssl_ctx.get()); |
| if (!ssl) { |
| error("[worker] Failed to construct SSL structure for new connection: {}", fmt::streamed(SSLErrors{})); |
| evutil_closesocket(fd); |
| return; |
| } |
| bev = bufferevent_openssl_socket_new(base, fd, ssl, BUFFEREVENT_SSL_ACCEPTING, ev_thread_safe_flags); |
| } else { |
| bev = bufferevent_socket_new(base, fd, ev_thread_safe_flags); |
| } |
| #else |
| bev = bufferevent_socket_new(base, fd, ev_thread_safe_flags); |
| #endif |
| if (!bev) { |
| auto socket_err = evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()); |
| #ifdef ENABLE_OPENSSL |
| error("[worker] Failed to construct socket for new connection: {}, SSL error: {}", socket_err, |
| fmt::streamed(SSLErrors{})); |
| if (ssl) SSL_free(ssl); |
| #else |
| error("[worker] Failed to construct socket for new connection: {}", socket_err); |
| #endif |
| evutil_closesocket(fd); |
| return; |
| } |
| #ifdef ENABLE_OPENSSL |
| if (uint32_t(local_port) == srv->GetConfig()->tls_port) { |
| bufferevent_openssl_set_allow_dirty_shutdown(bev, 1); |
| } |
| #endif |
| auto conn = new redis::Connection(bev, this); |
| conn->SetCB(bev); |
| bufferevent_enable(bev, EV_READ); |
| |
| s = AddConnection(conn); |
| if (!s.IsOK()) { |
| std::string err_msg = redis::Error({Status::NotOK, s.Msg()}); |
| s = util::SockSend(fd, err_msg, ssl); |
| if (!s.IsOK()) { |
| warn("[worker] Failed to send error response to socket: {}", s.Msg()); |
| } |
| conn->Close(); |
| return; |
| } |
| |
| if (auto s = util::GetPeerAddr(fd)) { |
| auto [ip, port] = std::move(*s); |
| conn->SetAddr(ip, port); |
| } |
| |
| if (rate_limit_group_) { |
| bufferevent_add_to_rate_limit_group(bev, rate_limit_group_); |
| } |
| } |
| |
| void Worker::newUnixSocketConnection(evconnlistener *listener, evutil_socket_t fd, [[maybe_unused]] sockaddr *address, |
| [[maybe_unused]] int socklen) { |
| debug("[worker] New connection: fd={} from unixsocket: {} thread #{}", fd, srv->GetConfig()->unixsocket, |
| fmt::streamed(tid_)); |
| event_base *base = evconnlistener_get_base(listener); |
| auto ev_thread_safe_flags = |
| BEV_OPT_THREADSAFE | BEV_OPT_DEFER_CALLBACKS | BEV_OPT_UNLOCK_CALLBACKS | BEV_OPT_CLOSE_ON_FREE; |
| bufferevent *bev = bufferevent_socket_new(base, fd, ev_thread_safe_flags); |
| |
| auto conn = new redis::Connection(bev, this); |
| conn->SetCB(bev); |
| bufferevent_enable(bev, EV_READ); |
| |
| auto s = AddConnection(conn); |
| if (!s.IsOK()) { |
| s = util::SockSend(fd, redis::Error(s)); |
| if (!s.IsOK()) { |
| warn("[worker] Failed to send error response to socket: {}", s.Msg()); |
| } |
| conn->Close(); |
| return; |
| } |
| |
| conn->SetAddr(srv->GetConfig()->unixsocket, 0); |
| if (rate_limit_group_) { |
| bufferevent_add_to_rate_limit_group(bev, rate_limit_group_); |
| } |
| } |
| |
| Status Worker::listenFD(int fd, uint32_t expected_port, int backlog) { |
| const uint32_t port = util::GetLocalPort(fd); |
| if (port != expected_port) { |
| return {Status::NotOK, "The port of the provided socket fd doesn't match the configured port"}; |
| } |
| const int dup_fd = dup(fd); |
| if (dup_fd == -1) { |
| return {Status::NotOK, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())}; |
| } |
| evconnlistener *lev = |
| NewEvconnlistener<&Worker::newTCPConnection>(base_, LEV_OPT_THREADSAFE | LEV_OPT_CLOSE_ON_FREE, backlog, dup_fd); |
| listen_events_.emplace_back(lev); |
| info("[worker] Listening on dup'ed fd: {}", dup_fd); |
| return Status::OK(); |
| } |
| |
| Status Worker::listenTCP(const std::string &host, uint32_t port, int backlog) { |
| bool ipv6_used = strchr(host.data(), ':'); |
| |
| addrinfo hints = {}; |
| hints.ai_family = ipv6_used ? AF_INET6 : AF_INET; |
| hints.ai_socktype = SOCK_STREAM; |
| hints.ai_flags = AI_PASSIVE; |
| |
| addrinfo *srv_info = nullptr; |
| if (int rv = getaddrinfo(host.data(), std::to_string(port).c_str(), &hints, &srv_info); rv != 0) { |
| return {Status::NotOK, gai_strerror(rv)}; |
| } |
| auto exit = MakeScopeExit([srv_info] { freeaddrinfo(srv_info); }); |
| |
| for (auto p = srv_info; p != nullptr; p = p->ai_next) { |
| int fd = socket(p->ai_family, p->ai_socktype, p->ai_protocol); |
| if (fd == -1) continue; |
| |
| int sock_opt = 1; |
| if (ipv6_used && setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &sock_opt, sizeof(sock_opt)) == -1) { |
| return {Status::NotOK, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())}; |
| } |
| |
| if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &sock_opt, sizeof(sock_opt)) < 0) { |
| return {Status::NotOK, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())}; |
| } |
| |
| // to support multi-thread binding on macOS |
| if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &sock_opt, sizeof(sock_opt)) < 0) { |
| return {Status::NotOK, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())}; |
| } |
| |
| if (bind(fd, p->ai_addr, p->ai_addrlen)) { |
| return {Status::NotOK, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())}; |
| } |
| |
| evutil_make_socket_nonblocking(fd); |
| auto lev = |
| NewEvconnlistener<&Worker::newTCPConnection>(base_, LEV_OPT_THREADSAFE | LEV_OPT_CLOSE_ON_FREE, backlog, fd); |
| listen_events_.emplace_back(lev); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status Worker::ListenUnixSocket(const std::string &path, int perm, int backlog) { |
| unlink(path.c_str()); |
| sockaddr_un sa{}; |
| if (path.size() > sizeof(sa.sun_path) - 1) { |
| return {Status::NotOK, "unix socket path too long"}; |
| } |
| |
| sa.sun_family = AF_LOCAL; |
| strncpy(sa.sun_path, path.c_str(), sizeof(sa.sun_path) - 1); |
| int fd = socket(AF_LOCAL, SOCK_STREAM, 0); |
| if (fd == -1) { |
| return {Status::NotOK, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())}; |
| } |
| |
| if (bind(fd, (sockaddr *)&sa, sizeof(sa)) < 0) { |
| return {Status::NotOK, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())}; |
| } |
| |
| evutil_make_socket_nonblocking(fd); |
| auto lev = NewEvconnlistener<&Worker::newUnixSocketConnection>(base_, LEV_OPT_CLOSE_ON_FREE, backlog, fd); |
| listen_events_.emplace_back(lev); |
| if (perm != 0) { |
| chmod(sa.sun_path, (mode_t)perm); |
| } |
| |
| return Status::OK(); |
| } |
| |
| void Worker::Run(std::thread::id tid) { |
| tid_ = tid; |
| if (event_base_dispatch(base_) != 0) { |
| error("[worker] Failed to run server, err: {}", strerror(errno)); |
| } |
| is_terminated_ = true; |
| } |
| |
| void Worker::Stop(uint32_t wait_seconds) { |
| for (const auto &lev : listen_events_) { |
| // It's unnecessary to close the listener fd since we have set the LEV_OPT_CLOSE_ON_FREE flag |
| evconnlistener_free(lev); |
| } |
| // wait_seconds == 0 means stop immediately, or it will wait N seconds |
| // for the worker to process the remaining requests before stopping. |
| if (wait_seconds > 0) { |
| timeval tv = {wait_seconds, 0}; |
| event_base_loopexit(base_, &tv); |
| } else { |
| event_base_loopbreak(base_); |
| } |
| } |
| |
| Status Worker::AddConnection(redis::Connection *c) { |
| std::unique_lock<std::mutex> lock(conns_mu_); |
| auto iter = conns_.find(c->GetFD()); |
| if (iter != conns_.end()) { |
| return {Status::NotOK, "connection was exists"}; |
| } |
| |
| int max_clients = srv->GetConfig()->maxclients; |
| if (srv->IncrClientNum() >= max_clients) { |
| srv->DecrClientNum(); |
| return {Status::NotOK, "max number of clients reached"}; |
| } |
| |
| conns_.emplace(c->GetFD(), c); |
| uint64_t id = srv->GetClientID(); |
| c->SetID(id); |
| |
| return Status::OK(); |
| } |
| |
| redis::Connection *Worker::removeConnection(int fd) { |
| redis::Connection *conn = nullptr; |
| |
| std::unique_lock<std::mutex> lock(conns_mu_); |
| auto iter = conns_.find(fd); |
| if (iter != conns_.end()) { |
| conn = iter->second; |
| conns_.erase(iter); |
| srv->DecrClientNum(); |
| } |
| |
| iter = monitor_conns_.find(fd); |
| if (iter != monitor_conns_.end()) { |
| conn = iter->second; |
| monitor_conns_.erase(iter); |
| srv->DecrClientNum(); |
| srv->DecrMonitorClientNum(); |
| } |
| |
| return conn; |
| } |
| |
| // MigrateConnection moves the connection to another worker |
| // when reducing the number of workers. |
| // |
| // To make it simple, we would close the connection if it's |
| // blocked on a key or stream. |
| void Worker::MigrateConnection(Worker *target, redis::Connection *conn) { |
| if (!target || !conn) return; |
| |
| auto bev = conn->GetBufferEvent(); |
| // disable read/write event to prevent the connection from being processed during migration |
| bufferevent_disable(bev, EV_READ | EV_WRITE); |
| // We cannot migrate the connection if it has a running command |
| // since it will cause data race since the old worker may still process the command. |
| if (!conn->CanMigrate()) { |
| // Need to enable read/write event again since we disabled them before |
| bufferevent_enable(bev, EV_READ | EV_WRITE); |
| return; |
| } |
| |
| // remove the connection from current worker |
| DetachConnection(conn); |
| if (!target->AddConnection(conn).IsOK()) { |
| conn->Close(); |
| return; |
| } |
| bufferevent_base_set(target->base_, bev); |
| conn->SetCB(bev); |
| bufferevent_enable(bev, EV_READ | EV_WRITE); |
| conn->SetOwner(target); |
| } |
| |
| void Worker::DetachConnection(redis::Connection *conn) { |
| if (!conn) return; |
| |
| removeConnection(conn->GetFD()); |
| |
| if (rate_limit_group_) { |
| bufferevent_remove_from_rate_limit_group(conn->GetBufferEvent()); |
| } |
| |
| auto bev = conn->GetBufferEvent(); |
| bufferevent_disable(bev, EV_READ | EV_WRITE); |
| bufferevent_setcb(bev, nullptr, nullptr, nullptr, nullptr); |
| } |
| |
| void Worker::FreeConnection(redis::Connection *conn) { |
| if (!conn) return; |
| |
| removeConnection(conn->GetFD()); |
| srv->ResetWatchedKeys(conn); |
| if (rate_limit_group_) { |
| bufferevent_remove_from_rate_limit_group(conn->GetBufferEvent()); |
| } |
| delete conn; |
| } |
| |
| void Worker::FreeConnectionByID(int fd, uint64_t id) { |
| std::unique_lock<std::mutex> lock(conns_mu_); |
| auto iter = conns_.find(fd); |
| if (iter != conns_.end() && iter->second->GetID() == id) { |
| if (rate_limit_group_ != nullptr) { |
| bufferevent_remove_from_rate_limit_group(iter->second->GetBufferEvent()); |
| } |
| delete iter->second; |
| conns_.erase(iter); |
| srv->DecrClientNum(); |
| } |
| |
| iter = monitor_conns_.find(fd); |
| if (iter != monitor_conns_.end() && iter->second->GetID() == id) { |
| delete iter->second; |
| monitor_conns_.erase(iter); |
| srv->DecrClientNum(); |
| srv->DecrMonitorClientNum(); |
| } |
| } |
| |
| Status Worker::EnableWriteEvent(int fd) { |
| std::unique_lock<std::mutex> lock(conns_mu_); |
| auto iter = conns_.find(fd); |
| if (iter != conns_.end()) { |
| auto bev = iter->second->GetBufferEvent(); |
| bufferevent_enable(bev, EV_WRITE); |
| return Status::OK(); |
| } |
| |
| return {Status::NotOK, "connection doesn't exist"}; |
| } |
| |
| Status Worker::Reply(int fd, const std::string &reply) { |
| std::unique_lock<std::mutex> lock(conns_mu_); |
| auto iter = conns_.find(fd); |
| if (iter != conns_.end()) { |
| iter->second->SetLastInteraction(); |
| redis::Reply(iter->second->Output(), reply); |
| return Status::OK(); |
| } |
| |
| return {Status::NotOK, "connection doesn't exist"}; |
| } |
| |
| void Worker::BecomeMonitorConn(redis::Connection *conn) { |
| { |
| std::lock_guard<std::mutex> guard(conns_mu_); |
| conns_.erase(conn->GetFD()); |
| monitor_conns_[conn->GetFD()] = conn; |
| } |
| srv->IncrMonitorClientNum(); |
| conn->EnableFlag(redis::Connection::kMonitor); |
| } |
| |
| void Worker::QuitMonitorConn(redis::Connection *conn) { |
| { |
| std::lock_guard<std::mutex> guard(conns_mu_); |
| monitor_conns_.erase(conn->GetFD()); |
| conns_[conn->GetFD()] = conn; |
| } |
| srv->DecrMonitorClientNum(); |
| conn->DisableFlag(redis::Connection::kMonitor); |
| } |
| |
| void Worker::FeedMonitorConns(redis::Connection *conn, const std::string &response) { |
| std::unique_lock<std::mutex> lock(conns_mu_); |
| |
| for (const auto &iter : monitor_conns_) { |
| if (conn == iter.second) continue; // skip the monitor command |
| |
| if (conn->GetNamespace() == iter.second->GetNamespace() || iter.second->GetNamespace() == kDefaultNamespace) { |
| iter.second->Reply(response); |
| } |
| } |
| } |
| |
| std::string Worker::GetClientsStr() { |
| std::unique_lock<std::mutex> lock(conns_mu_); |
| |
| std::string output; |
| for (const auto &iter : conns_) { |
| redis::Connection *conn = iter.second; |
| output.append(conn->ToString()); |
| } |
| |
| return output; |
| } |
| |
| void Worker::KillClient(redis::Connection *self, uint64_t id, const std::string &addr, uint64_t type, bool skipme, |
| int64_t *killed) { |
| std::lock_guard<std::mutex> guard(conns_mu_); |
| |
| for (const auto &iter : conns_) { |
| redis::Connection *conn = iter.second; |
| if (skipme && self == conn) continue; |
| |
| // no need to kill the client again if the kCloseAfterReply flag is set |
| if (conn->IsFlagEnabled(redis::Connection::kCloseAfterReply)) { |
| continue; |
| } |
| |
| if ((type & conn->GetClientType()) || |
| (!addr.empty() && (conn->GetAddr() == addr || conn->GetAnnounceAddr() == addr)) || |
| (id != 0 && conn->GetID() == id)) { |
| conn->EnableFlag(redis::Connection::kCloseAfterReply); |
| // enable write event to notify worker wake up ASAP, and remove the connection |
| if (!conn->IsFlagEnabled(redis::Connection::kSlave)) { // don't enable any event in slave connection |
| auto bev = conn->GetBufferEvent(); |
| bufferevent_enable(bev, EV_WRITE); |
| } |
| (*killed)++; |
| } |
| } |
| } |
| |
| void Worker::LuaReset() { |
| auto lua = lua_.exchange(lua::CreateState()); |
| lua::DestroyState(lua); |
| } |
| |
| int64_t Worker::GetLuaMemorySize() { return (int64_t)lua_gc(lua_, LUA_GCCOUNT, 0) * 1024; } |
| |
| void Worker::KickoutIdleClients(int timeout) { |
| std::vector<std::pair<int, uint64_t>> to_be_killed_conns; |
| |
| { |
| std::lock_guard<std::mutex> guard(conns_mu_); |
| if (conns_.empty()) { |
| return; |
| } |
| |
| int iterations = std::min(static_cast<int>(conns_.size()), 50); |
| auto iter = conns_.upper_bound(last_iter_conn_fd_); |
| while (iterations--) { |
| if (iter == conns_.end()) iter = conns_.begin(); |
| if (static_cast<int>(iter->second->GetIdleTime()) >= timeout) { |
| to_be_killed_conns.emplace_back(iter->first, iter->second->GetID()); |
| } |
| iter++; |
| } |
| iter--; |
| last_iter_conn_fd_ = iter->first; |
| } |
| |
| for (const auto &conn : to_be_killed_conns) { |
| FreeConnectionByID(conn.first, conn.second); |
| } |
| } |
| |
| void WorkerThread::Start() { |
| auto s = util::CreateThread("worker", [this] { this->worker_->Run(std::this_thread::get_id()); }); |
| |
| if (s) { |
| t_ = std::move(*s); |
| } else { |
| error("[worker] Failed to start worker thread, err: {}", s.Msg()); |
| return; |
| } |
| |
| info("[worker] Thread #{} started", fmt::streamed(t_.get_id())); |
| } |
| |
| void WorkerThread::Stop(uint32_t wait_seconds) { worker_->Stop(wait_seconds); } |
| |
| void WorkerThread::Join() { |
| if (auto s = util::ThreadJoin(t_); !s) { |
| warn("[worker] {}", s.Msg()); |
| } |
| } |