blob: 3a6e58c049cc2b8cd05022ef4ddc8639d5d17542 [file] [log] [blame]
#include "worker.h"
#include <sys/types.h>
#include <sys/socket.h>
#include <list>
#include <cctype>
#include <utility>
#include <algorithm>
#include <glog/logging.h>
#include "redis_request.h"
#include "redis_connection.h"
#include "server.h"
#include "util.h"
Worker::Worker(Server *svr, Config *config, bool repl) : svr_(svr), repl_(repl) {
base_ = event_base_new();
if (!base_) throw std::exception();
timer_ = event_new(base_, -1, EV_PERSIST, TimerCB, this);
timeval tm = {10, 0};
evtimer_add(timer_, &tm);
int port = repl ? config->repl_port : config->port;
auto binds = repl ? config->repl_binds : config->binds;
for (const auto &bind : binds) {
Status s = listen(bind, port, config->backlog);
if (!s.IsOK()) {
LOG(ERROR) << "[worker] Failed to listen on: "<< bind << ":" << port
<< ", encounter error: " << s.Msg();
exit(1);
}
}
}
Worker::~Worker() {
std::list<Redis::Connection*> conns;
for (const auto &iter : conns_) {
conns.emplace_back(iter.second);
}
for (const auto &iter : conns) {
iter->Close();
}
event_free(timer_);
if (rate_limit_group_ != nullptr) {
bufferevent_rate_limit_group_free(rate_limit_group_);
}
if (rate_limit_group_cfg_ != nullptr) {
ev_token_bucket_cfg_free(rate_limit_group_cfg_);
}
event_base_free(base_);
}
void Worker::TimerCB(int, int16_t events, void *ctx) {
auto worker = static_cast<Worker*>(ctx);
auto config = worker->svr_->GetConfig();
if (config->timeout == 0) return;
worker->KickoutIdleClients(config->timeout);
}
void Worker::newConnection(evconnlistener *listener, evutil_socket_t fd,
sockaddr *address, int socklen, void *ctx) {
auto worker = static_cast<Worker *>(ctx);
if (worker->IsRepl()) {
DLOG(INFO) << "[worker] New connection: fd=" << fd
<< " from port: " << worker->svr_->GetConfig()->repl_port << " thread #"
<< worker->tid_;
} else {
DLOG(INFO) << "[worker] New connection: fd=" << fd
<< " from port: " << worker->svr_->GetConfig()->port << " thread #"
<< worker->tid_;
}
auto s = Util::SockSetTcpKeepalive(fd, 120);
if (!s.IsOK()) {
LOG(ERROR) << "[worker] Failed to set tcp-keepalive, err:" << s.Msg();
evutil_closesocket(fd);
return;
}
s = Util::SockSetTcpNoDelay(fd, 1);
if (!s.IsOK()) {
LOG(ERROR) << "[worker] Failed to set tcp-nodelay, err:" << s.Msg();
evutil_closesocket(fd);
return;
}
event_base *base = evconnlistener_get_base(listener);
auto evThreadSafeFlags = BEV_OPT_THREADSAFE | BEV_OPT_DEFER_CALLBACKS | BEV_OPT_UNLOCK_CALLBACKS;
bufferevent *bev = bufferevent_socket_new(base,
fd,
BEV_OPT_CLOSE_ON_FREE | evThreadSafeFlags);
auto conn = new Redis::Connection(bev, worker);
bufferevent_setcb(bev, Redis::Connection::OnRead, Redis::Connection::OnWrite,
Redis::Connection::OnEvent, conn);
bufferevent_enable(bev, EV_READ);
Status status = worker->AddConnection(conn);
std::string ip;
uint32_t port;
if (Util::GetPeerAddr(fd, &ip, &port) == 0) {
conn->SetAddr(ip, port);
}
if (!status.IsOK()) {
std::string err_msg = Redis::Error("ERR " + status.Msg());
write(fd, err_msg.data(), err_msg.size());
conn->Close();
}
if (worker->rate_limit_group_ != nullptr) {
bufferevent_add_to_rate_limit_group(bev, worker->rate_limit_group_);
}
}
Status Worker::listen(const std::string &host, int port, int backlog) {
sockaddr_in sin{};
sin.sin_family = AF_INET;
evutil_inet_pton(AF_INET, host.data(), &(sin.sin_addr));
sin.sin_port = htons(port);
int fd = socket(AF_INET, SOCK_STREAM, 0);
int sock_opt = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &sock_opt, sizeof(sock_opt)) < 0) {
return Status(Status::NotOK, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
}
// to support multi-thread binding on macOS
if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &sock_opt, sizeof(sock_opt)) < 0) {
return Status(Status::NotOK, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
}
if (bind(fd, (struct sockaddr *)&sin, sizeof(sin)) < 0) {
return Status(Status::NotOK, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
}
evutil_make_socket_nonblocking(fd);
auto lev = evconnlistener_new(base_, newConnection, this,
LEV_OPT_CLOSE_ON_FREE, backlog, fd);
listen_events_.emplace_back(lev);
return Status::OK();
}
void Worker::Run(std::thread::id tid) {
tid_ = tid;
if (event_base_dispatch(base_) != 0) {
LOG(ERROR) << "[worker] Failed to run server, err: " << strerror(errno);
}
}
void Worker::Stop() {
event_base_loopbreak(base_);
for (const auto &lev : listen_events_) {
evutil_socket_t fd = evconnlistener_get_fd(lev);
if (fd > 0) close(fd);
evconnlistener_free(lev);
}
}
Status Worker::AddConnection(Redis::Connection *c) {
std::unique_lock<std::mutex> lock(conns_mu_);
auto iter = conns_.find(c->GetFD());
if (iter != conns_.end()) {
return Status(Status::NotOK, "connection was exists");
}
int max_clients = svr_->GetConfig()->maxclients;
if (svr_->IncrClientNum() >= max_clients) {
svr_->DecrClientNum();
return Status(Status::NotOK, "max number of clients reached");
}
conns_.insert(std::pair<int, Redis::Connection*>(c->GetFD(), c));
uint64_t id = svr_->GetClientID()->fetch_add(1, std::memory_order_relaxed);
c->SetID(id);
return Status::OK();
}
Redis::Connection *Worker::removeConnection(int fd) {
Redis::Connection *conn = nullptr;
std::unique_lock<std::mutex> lock(conns_mu_);
auto iter = conns_.find(fd);
if (iter != conns_.end()) {
conn = iter->second;
conns_.erase(iter);
svr_->DecrClientNum();
}
iter = monitor_conns_.find(fd);
if (iter != monitor_conns_.end()) {
conn = iter->second;
monitor_conns_.erase(iter);
svr_->DecrClientNum();
svr_->DecrMonitorClientNum();
}
return conn;
}
void Worker::DetachConnection(Redis::Connection *conn) {
if (!conn) return;
removeConnection(conn->GetFD());
if (rate_limit_group_ != nullptr) {
bufferevent_remove_from_rate_limit_group(conn->GetBufferEvent());
}
auto bev = conn->GetBufferEvent();
bufferevent_disable(bev, EV_READ|EV_WRITE);
bufferevent_setcb(bev, nullptr, nullptr, nullptr, nullptr);
}
void Worker::FreeConnection(Redis::Connection *conn) {
if (!conn) return;
removeConnection(conn->GetFD());
if (rate_limit_group_ != nullptr) {
bufferevent_remove_from_rate_limit_group(conn->GetBufferEvent());
}
delete conn;
}
void Worker::FreeConnectionByID(int fd, uint64_t id) {
std::unique_lock<std::mutex> lock(conns_mu_);
auto conn_iter = conns_.find(fd);
if (conn_iter != conns_.end() && conn_iter->second->GetID() == id) {
if (rate_limit_group_ != nullptr) {
bufferevent_remove_from_rate_limit_group(conn_iter->second->GetBufferEvent());
}
delete conn_iter->second;
conns_.erase(conn_iter);
svr_->DecrClientNum();
}
auto monitor_conn_iter = monitor_conns_.find(fd);
if (monitor_conn_iter != monitor_conns_.end() && monitor_conn_iter->second->GetID() == id) {
delete monitor_conn_iter->second;
monitor_conns_.erase(monitor_conn_iter);
svr_->DecrClientNum();
svr_->DecrMonitorClientNum();
}
}
Status Worker::EnableWriteEvent(int fd) {
std::unique_lock<std::mutex> lock(conns_mu_);
auto iter = conns_.find(fd);
if (iter != conns_.end()) {
auto bev = iter->second->GetBufferEvent();
bufferevent_enable(bev, EV_WRITE);
return Status::OK();
}
return Status(Status::NotOK);
}
Status Worker::Reply(int fd, const std::string &reply) {
std::unique_lock<std::mutex> lock(conns_mu_);
auto iter = conns_.find(fd);
if (iter != conns_.end()) {
iter->second->SetLastInteraction();
Redis::Reply(iter->second->Output(), reply);
return Status::OK();
}
return Status(Status::NotOK, "connection doesn't exist");
}
int Worker::SetReplicationRateLimit(uint64_t max_replication_bytes) {
auto write_limit = EV_RATE_LIMIT_MAX;
if (max_replication_bytes > 0) {
write_limit = max_replication_bytes;
}
struct timeval cfg_tick = {1, 0};
auto old_cfg = rate_limit_group_cfg_;
rate_limit_group_cfg_ = ev_token_bucket_cfg_new(
EV_RATE_LIMIT_MAX, EV_RATE_LIMIT_MAX,
write_limit, write_limit,
&cfg_tick);
if (rate_limit_group_cfg_ == nullptr) {
LOG(ERROR) << "[server] ev_token_bucket_cfg_new error";
rate_limit_group_cfg_ = old_cfg;
return -1;
}
if (rate_limit_group_ != nullptr) {
bufferevent_rate_limit_group_set_cfg(rate_limit_group_, rate_limit_group_cfg_);
} else {
rate_limit_group_ = bufferevent_rate_limit_group_new(base_, rate_limit_group_cfg_);
}
if (old_cfg != nullptr) {
ev_token_bucket_cfg_free(old_cfg);
}
return 0;
}
void Worker::BecomeMonitorConn(Redis::Connection *conn) {
conns_mu_.lock();
conns_.erase(conn->GetFD());
monitor_conns_[conn->GetFD()] = conn;
conns_mu_.unlock();
svr_->IncrMonitorClientNum();
conn->EnableFlag(Redis::Connection::kMonitor);
}
void Worker::FeedMonitorConns(Redis::Connection *conn, const std::vector<std::string> &tokens) {
struct timeval tv;
gettimeofday(&tv, nullptr);
std::string output;
output += std::to_string(tv.tv_sec) + "." + std::to_string(tv.tv_usec);
output += " [" + conn->GetNamespace() + " " + conn->GetAddr() + "]";
for (const auto &token : tokens) {
output += " \"" + token + "\"";
}
std::unique_lock<std::mutex> lock(conns_mu_);
for (const auto &iter : monitor_conns_) {
if (conn == iter.second) continue; // skip the monitor command
if (conn->GetNamespace() == iter.second->GetNamespace()
|| iter.second->GetNamespace() == kDefaultNamespace) {
iter.second->Reply(Redis::SimpleString(output));
}
}
}
std::string Worker::GetClientsStr() {
std::unique_lock<std::mutex> lock(conns_mu_);
std::string output;
for (const auto iter : conns_) {
Redis::Connection *conn = iter.second;
output.append(conn->ToString());
}
return output;
}
void Worker::KillClient(Redis::Connection *self, uint64_t id, std::string addr, bool skipme, int64_t *killed) {
conns_mu_.lock();
for (const auto iter : conns_) {
Redis::Connection* conn = iter.second;
if (skipme && self == conn) continue;
if ((!addr.empty() && conn->GetAddr() == addr) || (id != 0 && conn->GetID() == id)) {
conn->EnableFlag(Redis::Connection::kCloseAfterReply);
// enable write event to notify worker wake up ASAP, and remove the connection
if (!conn->IsFlagEnabled(Redis::Connection::kSlave)) { // don't enable any event in slave connection
auto bev = conn->GetBufferEvent();
bufferevent_enable(bev, EV_WRITE);
}
(*killed)++;
}
}
conns_mu_.unlock();
}
void Worker::KickoutIdleClients(int timeout) {
conns_mu_.lock();
std::list<std::pair<int, uint64_t>> to_be_killed_conns;
if (conns_.empty()) {
conns_mu_.unlock();
return;
}
int iterations = std::min(static_cast<int>(conns_.size()), 50);
auto iter = conns_.upper_bound(last_iter_conn_fd);
while (iterations--) {
if (iter == conns_.end()) iter = conns_.begin();
if (static_cast<int>(iter->second->GetIdleTime()) >= timeout) {
to_be_killed_conns.emplace_back(std::make_pair(iter->first, iter->second->GetID()));
}
iter++;
}
iter--;
last_iter_conn_fd = iter->first;
conns_mu_.unlock();
for (const auto conn : to_be_killed_conns) {
FreeConnectionByID(conn.first, conn.second);
}
}
void WorkerThread::Start() {
try {
t_ = std::thread([this]() {
if (this->worker_->IsRepl()) {
Util::ThreadSetName("repl-worker");
} else {
Util::ThreadSetName("worker");
}
this->worker_->Run(std::this_thread::get_id());
});
} catch (const std::system_error &e) {
LOG(ERROR) << "[worker] Failed to start worker thread, err: " << e.what();
return;
}
LOG(INFO) << "[worker] Thread #" << t_.get_id() << " started";
}
void WorkerThread::Stop() {
worker_->Stop();
}
void WorkerThread::Join() {
if (t_.joinable()) t_.join();
}