blob: f06166d5ecd14d54dfdcfb6ca1602c39d930ac32 [file] [log] [blame]
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include "asio_net_provider.h"
#include <boost/asio.hpp> // IWYU pragma: keep
#include <boost/asio/basic_datagram_socket.hpp>
#include <boost/asio/basic_socket_acceptor.hpp>
#include <boost/asio/basic_stream_socket.hpp>
#include <boost/asio/buffer.hpp>
#include <boost/asio/detail/impl/reactive_socket_service_base.ipp>
#include <boost/asio/detail/impl/service_registry.hpp>
#include <boost/asio/impl/io_context.hpp>
#include <boost/asio/impl/io_context.ipp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/address.hpp>
#include <boost/asio/ip/address_v4.hpp>
#include <boost/asio/ip/basic_endpoint.hpp>
#include <boost/asio/ip/impl/address.ipp>
#include <boost/asio/ip/impl/address_v4.ipp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ip/udp.hpp>
#include <boost/asio/socket_base.hpp>
#include <boost/system/error_code.hpp>
#include <stdint.h>
#include <stdio.h>
#include <string.h>
#include <cstddef>
#include <limits>
#include <memory>
#include <new>
#include "asio_rpc_session.h"
// IWYU pragma: no_include "boost/asio/basic_datagram_socket.hpp"
// IWYU pragma: no_include "boost/asio/basic_socket_acceptor.hpp"
// IWYU pragma: no_include "boost/asio/basic_stream_socket.hpp"
// IWYU pragma: no_include "boost/asio/buffer.hpp"
// IWYU pragma: no_include "boost/asio/detail/impl/reactive_socket_service_base.ipp"
// IWYU pragma: no_include "boost/asio/detail/impl/service_registry.hpp"
// IWYU pragma: no_include "boost/asio/impl/io_context.hpp"
// IWYU pragma: no_include "boost/asio/impl/io_context.ipp"
// IWYU pragma: no_include "boost/asio/io_context.hpp"
// IWYU pragma: no_include "boost/asio/ip/address.hpp"
// IWYU pragma: no_include "boost/asio/ip/address_v4.hpp"
// IWYU pragma: no_include "boost/asio/ip/basic_endpoint.hpp"
// IWYU pragma: no_include "boost/asio/ip/impl/address.ipp"
// IWYU pragma: no_include "boost/asio/ip/impl/address_v4.ipp"
// IWYU pragma: no_include "boost/asio/socket_base.hpp"
// IWYU pragma: no_include "boost/system/error_code.hpp
#include "runtime/task/task.h"
#include "runtime/task/task_worker.h"
#include "runtime/tool_api.h"
#include "utils/autoref_ptr.h"
#include "utils/blob.h"
#include "utils/customizable_id.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/rand.h"
namespace dsn {
class rpc_engine;
namespace tools {
DSN_DEFINE_uint32(network,
io_service_worker_count,
1,
"thread number for io service (timer and boost network)");
const int threads_per_event_loop = 1;
asio_network_provider::asio_network_provider(rpc_engine *srv, network *inner_provider)
: connection_oriented_network(srv, inner_provider), _acceptor(nullptr)
{
for (auto i = 0; i < FLAGS_io_service_worker_count; i++) {
// Using thread-local operation queues in single-threaded use cases (i.e. when
// concurrency_hint is 1) to eliminate a lock/unlock pair.
_io_services.emplace_back(
std::make_unique<boost::asio::io_service>(threads_per_event_loop));
}
}
asio_network_provider::~asio_network_provider()
{
if (_acceptor) {
_acceptor->close();
}
for (auto &io_service : _io_services) {
io_service->stop();
}
for (auto &w : _workers) {
w->join();
}
}
error_code asio_network_provider::start(rpc_channel channel, int port, bool client_only)
{
if (_acceptor != nullptr)
return ERR_SERVICE_ALREADY_RUNNING;
for (int i = 0; i < FLAGS_io_service_worker_count; i++) {
_workers.push_back(std::make_shared<std::thread>([this, i]() {
task::set_tls_dsn_context(node(), nullptr);
const char *name = ::dsn::tools::get_service_node_name(node());
char buffer[128];
sprintf(buffer, "%s.asio.%d", name, i);
task_worker::set_name(buffer);
boost::asio::io_service::work work(*_io_services[i]);
boost::system::error_code ec;
_io_services[i]->run(ec);
CHECK(!ec, "boost::asio::io_service run failed: err({})", ec.message());
}));
}
_acceptor = nullptr;
CHECK(channel == RPC_CHANNEL_TCP || channel == RPC_CHANNEL_UDP,
"invalid given channel {}",
channel);
_address.assign_ipv4(get_local_ipv4(), port);
if (!client_only) {
auto v4_addr = boost::asio::ip::address_v4::any(); //(ntohl(_address.ip));
::boost::asio::ip::tcp::endpoint endpoint(v4_addr, _address.port());
boost::system::error_code ec;
_acceptor.reset(new boost::asio::ip::tcp::acceptor(get_io_service()));
_acceptor->open(endpoint.protocol(), ec);
if (ec) {
LOG_ERROR("asio tcp acceptor open failed, error = {}", ec.message());
_acceptor.reset();
return ERR_NETWORK_INIT_FAILED;
}
_acceptor->set_option(boost::asio::socket_base::reuse_address(true));
_acceptor->bind(endpoint, ec);
if (ec) {
LOG_ERROR("asio tcp acceptor bind failed, error = {}", ec.message());
_acceptor.reset();
return ERR_NETWORK_INIT_FAILED;
}
int backlog = boost::asio::socket_base::max_connections;
_acceptor->listen(backlog, ec);
if (ec) {
LOG_ERROR("asio tcp acceptor listen failed, port = {}, error = {}",
_address.port(),
ec.message());
_acceptor.reset();
return ERR_NETWORK_INIT_FAILED;
}
do_accept();
}
return ERR_OK;
}
rpc_session_ptr asio_network_provider::create_client_session(::dsn::rpc_address server_addr)
{
auto sock = std::make_shared<boost::asio::ip::tcp::socket>(get_io_service());
message_parser_ptr parser(new_message_parser(_client_hdr_format));
return rpc_session_ptr(new asio_rpc_session(*this, server_addr, sock, parser, true));
}
void asio_network_provider::do_accept()
{
auto socket = std::make_shared<boost::asio::ip::tcp::socket>(get_io_service());
_acceptor->async_accept(*socket, [this, socket](boost::system::error_code ec) {
if (!ec) {
auto remote = socket->remote_endpoint(ec);
if (ec) {
LOG_ERROR("failed to get the remote endpoint: {}", ec.message());
} else {
auto ip = remote.address().to_v4().to_ulong();
auto port = remote.port();
::dsn::rpc_address client_addr(ip, port);
message_parser_ptr null_parser;
rpc_session_ptr s =
new asio_rpc_session(*this,
client_addr,
(std::shared_ptr<boost::asio::ip::tcp::socket> &)socket,
null_parser,
false);
// when server connection threshold is hit, close the session, otherwise accept it
if (check_if_conn_threshold_exceeded(s->remote_address())) {
LOG_WARNING("close rpc connection from {} to {} due to hitting server "
"connection threshold per ip",
s->remote_address(),
address());
s->close();
} else {
on_server_session_accepted(s);
// we should start read immediately after the rpc session is completely created.
s->start_read_next();
}
}
}
do_accept();
});
}
const size_t asio_udp_provider::max_udp_packet_size = 1000;
void asio_udp_provider::send_message(message_ex *request)
{
auto parser = get_message_parser(request->hdr_format);
parser->prepare_on_send(request);
auto lcount = parser->get_buffer_count_on_send(request);
std::unique_ptr<message_parser::send_buf[]> bufs(new message_parser::send_buf[lcount]);
auto rcount = parser->get_buffers_on_send(request, bufs.get());
CHECK_GE(lcount, rcount);
size_t tlen = 0, offset = 0;
for (int i = 0; i < rcount; i++) {
tlen += bufs[i].sz;
}
CHECK_LE_MSG(tlen, max_udp_packet_size, "the message is too large to send via a udp channel");
std::unique_ptr<char[]> packet_buffer(new char[tlen]);
for (int i = 0; i < rcount; i++) {
memcpy(&packet_buffer[offset], bufs[i].buf, bufs[i].sz);
offset += bufs[i].sz;
};
::boost::asio::ip::udp::endpoint ep(::boost::asio::ip::address_v4(request->to_address.ip()),
request->to_address.port());
_socket->async_send_to(
::boost::asio::buffer(packet_buffer.get(), tlen),
ep,
[=](const boost::system::error_code &error, std::size_t bytes_transferred) {
if (error) {
LOG_WARNING("send udp packet to ep {}:{} failed, message = {}",
ep.address().to_string(),
ep.port(),
error.message());
// we do not handle failure here, rpc matcher would handle timeouts
}
});
request->add_ref();
request->release_ref();
}
asio_udp_provider::asio_udp_provider(rpc_engine *srv, network *inner_provider)
: network(srv, inner_provider), _is_client(false), _recv_reader(_message_buffer_block_size)
{
_parsers = new message_parser *[network_header_format::max_value() + 1];
memset(_parsers, 0, sizeof(message_parser *) * (network_header_format::max_value() + 1));
}
asio_udp_provider::~asio_udp_provider()
{
for (int i = 0; i <= network_header_format::max_value(); i++) {
if (_parsers[i] != nullptr) {
delete _parsers[i];
_parsers[i] = nullptr;
}
}
delete[] _parsers;
_parsers = nullptr;
_io_service.stop();
for (auto &w : _workers) {
w->join();
}
}
message_parser *asio_udp_provider::get_message_parser(network_header_format hdr_format)
{
if (_parsers[hdr_format] == nullptr) {
utils::auto_lock<utils::ex_lock_nr> l(_lock);
if (_parsers[hdr_format] == nullptr) // double check
{
_parsers[hdr_format] = new_message_parser(hdr_format);
}
}
return _parsers[hdr_format];
}
void asio_udp_provider::do_receive()
{
std::shared_ptr<::boost::asio::ip::udp::endpoint> send_endpoint(
new ::boost::asio::ip::udp::endpoint);
_recv_reader.truncate_read();
auto buffer_ptr = _recv_reader.read_buffer_ptr(max_udp_packet_size);
CHECK_GE_MSG(_recv_reader.read_buffer_capacity(),
max_udp_packet_size,
"failed to load enough buffer in parser");
_socket->async_receive_from(
::boost::asio::buffer(buffer_ptr, max_udp_packet_size),
*send_endpoint,
[this, send_endpoint](const boost::system::error_code &error,
std::size_t bytes_transferred) {
if (!!error) {
LOG_ERROR("{}: asio udp read failed: {}", _address, error.message());
do_receive();
return;
}
if (bytes_transferred < sizeof(uint32_t)) {
LOG_ERROR("{}: asio udp read failed: too short message", _address);
do_receive();
return;
}
auto hdr_format = message_parser::get_header_type(_recv_reader._buffer.data());
if (NET_HDR_INVALID == hdr_format) {
LOG_ERROR("{}: asio udp read failed: invalid header type '{}'",
_address,
message_parser::get_debug_string(_recv_reader._buffer.data()));
do_receive();
return;
}
auto parser = get_message_parser(hdr_format);
parser->reset();
_recv_reader.mark_read(bytes_transferred);
int read_next = -1;
message_ex *msg = parser->get_message_on_receive(&_recv_reader, read_next);
if (msg == nullptr) {
LOG_ERROR("{}: asio udp read failed: invalid udp packet", _address);
do_receive();
return;
}
msg->to_address = _address;
if (msg->header->context.u.is_request) {
on_recv_request(msg, 0);
} else {
on_recv_reply(msg->header->id, msg, 0);
}
do_receive();
});
}
error_code asio_udp_provider::start(rpc_channel channel, int port, bool client_only)
{
_is_client = client_only;
CHECK_EQ(channel, RPC_CHANNEL_UDP);
if (client_only) {
do {
// FIXME: we actually do not need to set a random port for client if the rpc_engine is
// refactored
_address.assign_ipv4(get_local_ipv4(),
std::numeric_limits<uint16_t>::max() -
rand::next_u64(std::numeric_limits<uint64_t>::min(),
std::numeric_limits<uint64_t>::max()) %
5000);
::boost::asio::ip::udp::endpoint endpoint(boost::asio::ip::address_v4::any(),
_address.port());
boost::system::error_code ec;
_socket.reset(new ::boost::asio::ip::udp::socket(_io_service));
_socket->open(endpoint.protocol(), ec);
if (ec) {
LOG_ERROR("asio udp socket open failed, error = {}", ec.message());
_socket.reset();
continue;
}
_socket->bind(endpoint, ec);
if (ec) {
LOG_ERROR("asio udp socket bind failed, port = {}, error = {}",
_address.port(),
ec.message());
_socket.reset();
continue;
}
break;
} while (true);
} else {
_address.assign_ipv4(get_local_ipv4(), port);
::boost::asio::ip::udp::endpoint endpoint(boost::asio::ip::address_v4::any(),
_address.port());
boost::system::error_code ec;
_socket.reset(new ::boost::asio::ip::udp::socket(_io_service));
_socket->open(endpoint.protocol(), ec);
if (ec) {
LOG_ERROR("asio udp socket open failed, error = {}", ec.message());
_socket.reset();
return ERR_NETWORK_INIT_FAILED;
}
_socket->bind(endpoint, ec);
if (ec) {
LOG_ERROR("asio udp socket bind failed, port = {}, error = {}",
_address.port(),
ec.message());
_socket.reset();
return ERR_NETWORK_INIT_FAILED;
}
}
for (int i = 0; i < FLAGS_io_service_worker_count; i++) {
_workers.push_back(std::make_shared<std::thread>([this, i]() {
task::set_tls_dsn_context(node(), nullptr);
const char *name = ::dsn::tools::get_service_node_name(node());
char buffer[128];
sprintf(buffer, "%s.asio.udp.%d.%d", name, (int)(this->address().port()), i);
task_worker::set_name(buffer);
boost::asio::io_service::work work(_io_service);
boost::system::error_code ec;
_io_service.run(ec);
CHECK(!ec, "boost::asio::io_service run failed: err({})", ec.message());
}));
}
do_receive();
return ERR_OK;
}
// use a round-robin scheme to choose the next io_service to use.
boost::asio::io_service &asio_network_provider::get_io_service()
{
return *_io_services[rand::next_u32(0, FLAGS_io_service_worker_count - 1)];
}
} // namespace tools
} // namespace dsn