blob: a8780fc77b31996d6963cb4bd205d6bf93eeb89f [file] [log] [blame]
/*
* Copyright 2015 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
////////////////////////////////////////////////////////////////////////////////
// Implements the BaseServer class. See baseserver.h for details on the API
////////////////////////////////////////////////////////////////////////////////
#include "network/baseserver.h"
#include "glog/logging.h"
#include "basics/basics.h"
void CallHandleConnectionCloseAndDelete(BaseServer* _server, BaseConnection* _connection,
NetworkErrorCode _status) {
_server->HandleConnectionClose_Base(_connection, _status);
delete _connection;
}
BaseServer::BaseServer(EventLoop* eventLoop, const NetworkOptions& _options) {
Init(eventLoop, _options);
}
void BaseServer::Init(EventLoop* eventLoop, const NetworkOptions& _options) {
eventLoop_ = eventLoop;
options_ = _options;
listen_fd_ = -1;
connection_options_.max_packet_size_ = options_.get_max_packet_size();
connection_options_.high_watermark_ = options_.get_high_watermark();
connection_options_.low_watermark_ = options_.get_low_watermark();
on_new_connection_callback_ = [this](EventLoop::Status status) { this->OnNewConnection(status); };
}
BaseServer::~BaseServer() {}
sp_int32 BaseServer::Start_Base() {
// open a socket
errno = 0;
listen_fd_ = socket(options_.get_socket_family(), SOCK_STREAM, 0);
if (listen_fd_ < 0) {
LOG(ERROR) << "Opening of a socket failed in server " << errno << "\n";
return -1;
}
if (SockUtils::setSocketDefaults(listen_fd_) < 0) {
close(listen_fd_);
return -1;
}
// Set the socket option for addr reuse
if (SockUtils::setReuseAddress(listen_fd_) < 0) {
LOG(ERROR) << "setsockopt of a socket failed in server " << errno << "\n";
close(listen_fd_);
return -1;
}
// Set the address
struct sockaddr_in in_addr;
struct sockaddr_un unix_addr;
struct sockaddr* serv_addr = NULL;
socklen_t sockaddr_len = 0;
if (options_.get_sin_family() == AF_INET) {
bzero(reinterpret_cast<char*>(&in_addr), sizeof(in_addr));
in_addr.sin_family = options_.get_sin_family();
in_addr.sin_port = htons(options_.get_port());
in_addr.sin_addr.s_addr = INADDR_ANY;
serv_addr = (struct sockaddr*)&in_addr;
sockaddr_len = sizeof(in_addr);
} else {
bzero(reinterpret_cast<char*>(&unix_addr), sizeof(unix_addr));
unix_addr.sun_family = options_.get_sin_family();
snprintf(unix_addr.sun_path, sizeof(unix_addr.sun_path), "%s", options_.get_sin_path().c_str());
serv_addr = (struct sockaddr*)&unix_addr;
sockaddr_len = sizeof(unix_addr);
}
// Bind to the address
if (bind(listen_fd_, serv_addr, sockaddr_len) < 0) {
LOG(ERROR) << "bind of a socket failed in server " << errno << "\n";
close(listen_fd_);
return -1;
}
// Listen for new connections
if (listen(listen_fd_, 100) < 0) {
LOG(ERROR) << "listen of a socket failed in server " << errno << "\n";
close(listen_fd_);
return -1;
}
// Ask the EventLoop to deliver any read events
if (eventLoop_->registerForRead(listen_fd_, on_new_connection_callback_, true) < 0) {
LOG(ERROR) << "register for read of the socket failed in server\n";
close(listen_fd_);
return -1;
}
// fetch the port after bind/listen port 0
if (AF_INET == options_.get_sin_family() && 0 == options_.get_port()) {
if (getsockname(listen_fd_, serv_addr, &sockaddr_len) != 0) {
LOG(ERROR) << "getsockname() error: " << strerror(errno);
close(listen_fd_);
return SP_NOTOK;
} else {
options_.set_port(ntohs(in_addr.sin_port));
}
}
return 0;
}
sp_int32 BaseServer::Stop_Base() {
// Stop accepting new connections
CHECK_EQ(eventLoop_->unRegisterForRead(listen_fd_), 0);
// Close the listen socket.
close(listen_fd_);
// Close all active connections and delete them
while (active_connections_.size() > 0) {
BaseConnection* conn = *(active_connections_.begin());
conn->closeConnection();
// Note:- we don't delete the connection here. They are deleted in
// the OnConnectionClose call.
}
CHECK(active_connections_.empty());
return 0;
}
void BaseServer::OnNewConnection(EventLoop::Status _status) {
if (_status == EventLoop::READ_EVENT) {
// The EventLoop indicated that the socket is writable.
// Which means that a new client has connected to it.
auto endPoint = new ConnectionEndPoint(options_.get_sin_family() != AF_INET);
struct sockaddr* serv_addr = endPoint->addr();
socklen_t addrlen = endPoint->addrlen();
sp_int32 fd = accept(listen_fd_, serv_addr, &addrlen);
endPoint->set_fd(fd);
if (endPoint->get_fd() > 0) {
// accept succeeded.
// Set defaults
if (SockUtils::setSocketDefaults(endPoint->get_fd()) < 0) {
close(endPoint->get_fd());
delete endPoint;
return;
}
// Create the connection object and register our callbacks on various events.
BaseConnection* conn = CreateConnection(endPoint, &connection_options_, eventLoop_);
auto ccb = [conn, this](NetworkErrorCode ec) { this->OnConnectionClose(conn, ec); };
conn->registerForClose(std::move(ccb));
if (conn->start() != 0) {
// Connection didn't start properly. Cleanup.
// We assume here that this particular connection went bad, so we simply return.
LOG(ERROR) << "Could not start the connection for read write";
close(endPoint->get_fd());
delete conn;
return;
}
active_connections_.insert(conn);
HandleNewConnection_Base(conn);
return;
} else {
// accept failed.
if (errno == EAGAIN) {
// This is really odd. We thought that we had a read event
LOG(ERROR) << "accept failed with EAGAIN when it should have worked. Ignoring";
} else {
LOG(ERROR) << "accept failed with errno " << errno;
}
close(endPoint->get_fd());
delete endPoint;
return;
}
} else {
// What the hell, we only registered ourselves to reading
// Just print a warning message
LOG(WARNING) << "WARNING while expecting a read event we got " << _status;
return;
}
}
void BaseServer::OnConnectionClose(BaseConnection* _connection, NetworkErrorCode _status) {
if (active_connections_.find(_connection) == active_connections_.end()) {
LOG(ERROR) << "Connection closed for an unknown connection";
_status = INVALID_CONNECTION;
} else {
active_connections_.erase(_connection);
}
HandleConnectionClose_Base(_connection, _status);
delete _connection;
}
void BaseServer::CloseConnection_Base(BaseConnection* _connection) {
InternalCloseConnection(_connection);
return;
}
void BaseServer::InternalCloseConnection(BaseConnection* _connection) {
if (active_connections_.find(_connection) == active_connections_.end()) {
LOG(ERROR) << "Got the request close an unknown connection " << _connection << "\n";
return;
}
_connection->closeConnection();
return;
}
void BaseServer::AddTimer_Base(VCallback<> cb, sp_int64 _msecs) {
InternalAddTimer(std::move(cb), _msecs);
}
void BaseServer::InternalAddTimer(VCallback<> cb, sp_int64 _msecs) {
auto eCb = [cb, this](EventLoop::Status status) { this->OnTimer(std::move(cb), status); };
CHECK_GT(eventLoop_->registerTimer(eCb, false, _msecs), 0);
}
void BaseServer::OnTimer(VCallback<> cb, EventLoop::Status) { cb(); }