| /* |
| * The MIT License (MIT) |
| * |
| * Copyright (c) 2015 Microsoft Corporation |
| * |
| * -=- Robust Distributed System Nucleus (rDSN) -=- |
| * |
| * Permission is hereby granted, free of charge, to any person obtaining a copy |
| * of this software and associated documentation files (the "Software"), to deal |
| * in the Software without restriction, including without limitation the rights |
| * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
| * copies of the Software, and to permit persons to whom the Software is |
| * furnished to do so, subject to the following conditions: |
| * |
| * The above copyright notice and this permission notice shall be included in |
| * all copies or substantial portions of the Software. |
| * |
| * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
| * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
| * THE SOFTWARE. |
| */ |
| |
| #pragma once |
| |
| #include <stddef.h> |
| #include <memory> |
| #include <thread> |
| #include <vector> |
| |
| #include "boost/asio/io_service.hpp" |
| #include "boost/asio/ip/tcp.hpp" |
| #include "boost/asio/ip/udp.hpp" |
| #include "runtime/rpc/message_parser.h" |
| #include "runtime/rpc/network.h" |
| #include "runtime/rpc/rpc_address.h" |
| #include "runtime/rpc/rpc_message.h" |
| #include "runtime/task/task_spec.h" |
| #include "utils/error_code.h" |
| #include "utils/synchronize.h" |
| |
| namespace dsn { |
| class rpc_engine; |
| |
| namespace tools { |
| |
| /// asio_network_provider is a wrapper of Asio library for rDSN to accept a connection and create |
| /// sockets. Each io_service only allows one thread polling, so the operations of the single socket |
| /// are always done in a single thread. we create many io_service instances to take advantage of the |
| /// multi-core capabilities of the processor, and use the round-robin scheme to decide which |
| /// io_service for socket to choose. |
| /// |
| /// +-----------------------------------------------+ |
| /// |Linux kernel | |
| /// | +-----------+ +-----------+ +-----------+ | |
| /// | | Epoll1 | | Epoll2 | | Epoll3 | | |
| /// | | | | | | | | |
| /// | | rfd 1,2,3 | | rfd 4,5,6 | | rfd 7,8,9 | | |
| /// | | | | | | | | |
| /// | +-----^-----+ +-----^-----+ +-----^-----+ | |
| /// +-------|---------------|---------------|-------+ |
| /// +-----------+ +-----------+ +-----------+ |
| /// | polling | | polling | | polling | |
| /// | +-------+ | | +-------+ | | +-------+ | |
| /// | |Thread1| | | |Thread2| | | |Thread3| | |
| /// | +-------+ | | +-------+ | | +-------+ | |
| /// |io_service1| |io_service2| |io_service3| |
| /// +-----------+ +-----------+ +-----------+ |
| |
| class asio_network_provider : public connection_oriented_network |
| { |
| public: |
| asio_network_provider(rpc_engine *srv, network *inner_provider); |
| |
| ~asio_network_provider() override; |
| |
| virtual error_code start(rpc_channel channel, int port, bool client_only) override; |
| virtual ::dsn::rpc_address address() override { return _address; } |
| virtual rpc_session_ptr create_client_session(::dsn::rpc_address server_addr) override; |
| |
| private: |
| void do_accept(); |
| boost::asio::io_service &get_io_service(); |
| |
| private: |
| friend class asio_rpc_session; |
| friend class asio_network_provider_test; |
| |
| std::shared_ptr<boost::asio::ip::tcp::acceptor> _acceptor; |
| std::vector<std::unique_ptr<boost::asio::io_service>> _io_services; |
| std::vector<std::shared_ptr<std::thread>> _workers; |
| ::dsn::rpc_address _address; |
| }; |
| |
| // TODO(Tangyanzhao): change the network model like asio_network_provider |
| class asio_udp_provider : public network |
| { |
| public: |
| asio_udp_provider(rpc_engine *srv, network *inner_provider); |
| |
| ~asio_udp_provider() override; |
| |
| void send_message(message_ex *request) override; |
| |
| virtual error_code start(rpc_channel channel, int port, bool client_only) override; |
| |
| virtual ::dsn::rpc_address address() override { return _address; } |
| |
| virtual void inject_drop_message(message_ex *msg, bool is_send) override |
| { |
| // nothing to do for UDP |
| } |
| |
| private: |
| void do_receive(); |
| |
| // create parser on demand |
| message_parser *get_message_parser(network_header_format hdr_format); |
| |
| bool _is_client; |
| boost::asio::io_service _io_service; |
| std::shared_ptr<boost::asio::ip::udp::socket> _socket; |
| std::vector<std::shared_ptr<std::thread>> _workers; |
| ::dsn::rpc_address _address; |
| message_reader _recv_reader; |
| |
| ::dsn::utils::ex_lock_nr _lock; // [ |
| message_parser **_parsers; |
| // ] |
| |
| static const size_t max_udp_packet_size; |
| }; |
| |
| } // namespace tools |
| } // namespace dsn |