| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| #pragma once |
| |
| #include <cstdint> |
| #include <list> |
| #include <memory> |
| #include <string> |
| #include <unordered_map> |
| |
| #include <boost/function.hpp> // IWYU pragma: keep |
| #include <boost/intrusive/list.hpp> |
| #include <boost/intrusive/list_hook.hpp> |
| #include <ev++.h> |
| |
| #include "kudu/gutil/macros.h" |
| #include "kudu/gutil/ref_counted.h" |
| #include "kudu/rpc/connection.h" |
| #include "kudu/rpc/connection_id.h" |
| #include "kudu/rpc/messenger.h" |
| #include "kudu/rpc/rpc_header.pb.h" |
| #include "kudu/util/locks.h" |
| #include "kudu/util/metrics.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/random.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/thread.h" |
| |
| namespace kudu { |
| |
| class Sockaddr; |
| class Socket; |
| |
| namespace rpc { |
| |
| typedef std::list<scoped_refptr<Connection>> conn_list_t; |
| |
| class DumpConnectionsRequestPB; |
| class DumpConnectionsResponsePB; |
| class OutboundCall; |
| class Reactor; |
| class ReactorThread; |
| enum class CredentialsPolicy; |
| |
| // Simple metrics information from within a reactor. |
| // TODO(todd): switch these over to use util/metrics.h style metrics. |
| struct ReactorMetrics { |
| // Number of client RPC connections currently connected. |
| int32_t num_client_connections_; |
| // Number of server RPC connections currently connected. |
| int32_t num_server_connections_; |
| |
| // Total number of client RPC connections opened during Reactor's lifetime. |
| uint64_t total_client_connections_; |
| // Total number of server RPC connections opened during Reactor's lifetime. |
| uint64_t total_server_connections_; |
| }; |
| |
| // A task which can be enqueued to run on the reactor thread. |
| class ReactorTask : public boost::intrusive::list_base_hook<> { |
| public: |
| ReactorTask(); |
| |
| // Run the task. 'reactor' is guaranteed to be the current thread. |
| virtual void Run(ReactorThread *reactor) = 0; |
| |
| // Abort the task, in the case that the reactor shut down before the |
| // task could be processed. This may or may not run on the reactor thread |
| // itself. |
| // |
| // The Reactor guarantees that the Reactor lock is free when this |
| // method is called. |
| virtual void Abort(const Status &abort_status) {} |
| |
| virtual ~ReactorTask(); |
| |
| private: |
| DISALLOW_COPY_AND_ASSIGN(ReactorTask); |
| }; |
| |
| // A ReactorTask that is scheduled to run at some point in the future. |
| // |
| // Semantically it works like RunFunctionTask with a few key differences: |
| // 1. The user function is called during Abort. Put another way, the |
| // user function is _always_ invoked, even during reactor shutdown. |
| // 2. To differentiate between Abort and non-Abort, the user function |
| // receives a Status as its first argument. |
| class DelayedTask : public ReactorTask { |
| public: |
| DelayedTask(boost::function<void(const Status &)> func, MonoDelta when); |
| |
| // Schedules the task for running later but doesn't actually run it yet. |
| void Run(ReactorThread* thread) override; |
| |
| // Behaves like ReactorTask::Abort. |
| void Abort(const Status& abort_status) override; |
| |
| private: |
| // libev callback for when the registered timer fires. |
| void TimerHandler(ev::timer& watcher, int revents); |
| |
| // User function to invoke when timer fires or when task is aborted. |
| const boost::function<void(const Status&)> func_; |
| |
| // Delay to apply to this task. |
| const MonoDelta when_; |
| |
| // Link back to registering reactor thread. |
| ReactorThread* thread_; |
| |
| // libev timer. Set when Run() is invoked. |
| ev::timer timer_; |
| }; |
| |
| // A ReactorThread is a libev event handler thread which manages I/O |
| // on a list of sockets. |
| // |
| // All methods in this class are _only_ called from the reactor thread itself |
| // except where otherwise specified. New methods should DCHECK(IsCurrentThread()) |
| // to ensure this. |
| class ReactorThread { |
| public: |
| friend class Connection; |
| |
| // Client-side connection map. Multiple connections could be open to a remote |
| // server if multiple credential policies or different network planes are used |
| // for individual RPCs. |
| typedef std::unordered_multimap<ConnectionId, scoped_refptr<Connection>, |
| ConnectionIdHash, ConnectionIdEqual> |
| conn_multimap_t; |
| |
| ReactorThread(Reactor *reactor, const MessengerBuilder &bld); |
| |
| // This may be called from another thread. |
| Status Init(); |
| |
| // Add any connections on this reactor thread into the given status dump. |
| Status DumpConnections(const DumpConnectionsRequestPB& req, |
| DumpConnectionsResponsePB* resp); |
| |
| // Shuts down a reactor thread, optionally waiting for it to exit. |
| // Reactor::Shutdown() must have been called already. |
| // |
| // If mode == SYNC, may not be called from the reactor thread itself. |
| void Shutdown(Messenger::ShutdownMode mode); |
| |
| // This method is thread-safe. |
| void WakeThread(); |
| |
| // libev callback for handling async notifications in our epoll thread. |
| void AsyncHandler(ev::async &watcher, int revents); |
| |
| // libev callback for handling timer events in our epoll thread. |
| void TimerHandler(ev::timer &watcher, int revents); |
| |
| // Register an epoll timer watcher with our event loop. |
| // Does not set a timeout or start it. |
| void RegisterTimeout(ev::timer *watcher); |
| |
| // This may be called from another thread. |
| const std::string &name() const; |
| |
| MonoTime cur_time() const; |
| |
| // This may be called from another thread. |
| Reactor *reactor(); |
| |
| // Return true if this reactor thread is the thread currently |
| // running. Should be used in DCHECK assertions. |
| bool IsCurrentThread() const; |
| |
| // Begin the process of connection negotiation. |
| // Must be called from the reactor thread. |
| Status StartConnectionNegotiation(const scoped_refptr<Connection>& conn); |
| |
| // Transition back from negotiating to processing requests. |
| // Must be called from the reactor thread. |
| void CompleteConnectionNegotiation(const scoped_refptr<Connection>& conn, |
| const Status& status, |
| std::unique_ptr<ErrorStatusPB> rpc_error); |
| |
| // Collect metrics. |
| // Must be called from the reactor thread. |
| Status GetMetrics(ReactorMetrics *metrics); |
| |
| private: |
| friend class AssignOutboundCallTask; |
| friend class CancellationTask; |
| friend class RegisterConnectionTask; |
| friend class DelayedTask; |
| |
| // Run the main event loop of the reactor. |
| void RunThread(); |
| |
| // When libev has noticed that it needs to wake up an application watcher, |
| // it calls this callback. The callback simply calls back into libev's |
| // ev_invoke_pending() to trigger all the watcher callbacks, but |
| // wraps it with latency measurements. |
| static void InvokePendingCb(struct ev_loop* loop); |
| |
| // Similarly, libev calls these functions before/after invoking epoll_wait(). |
| // We use these to measure the amount of time spent waiting. |
| // |
| // NOTE: 'noexcept' is required to avoid compilation errors due to libev's |
| // use of the same exception specification. |
| static void AboutToPollCb(struct ev_loop* loop) noexcept; |
| static void PollCompleteCb(struct ev_loop* loop) noexcept; |
| |
| // Find a connection to the given remote and returns it in 'conn'. |
| // Returns true if a connection is found. Returns false otherwise. |
| bool FindConnection(const ConnectionId& conn_id, |
| CredentialsPolicy cred_policy, |
| scoped_refptr<Connection>* conn); |
| |
| // Find or create a new connection to the given remote. |
| // If such a connection already exists, returns that, otherwise creates a new one. |
| // May return a bad Status if the connect() call fails. |
| // The resulting connection object is managed internally by the reactor thread. |
| Status FindOrStartConnection(const ConnectionId& conn_id, |
| CredentialsPolicy cred_policy, |
| scoped_refptr<Connection>* conn); |
| |
| // Shut down the given connection, removing it from the connection tracking |
| // structures of this reactor. |
| // |
| // The connection is not explicitly deleted -- shared_ptr reference counting |
| // may hold on to the object after this, but callers should assume that it |
| // _may_ be deleted by this call. |
| void DestroyConnection(Connection *conn, const Status &conn_status, |
| std::unique_ptr<ErrorStatusPB> rpc_error = {}); |
| |
| // Scan any open connections for idle ones that have been idle longer than |
| // connection_keepalive_time_. If connection_keepalive_time_ < 0, the scan |
| // is skipped. |
| void ScanIdleConnections(); |
| |
| // Create a new client socket (non-blocking, NODELAY) |
| static Status CreateClientSocket(Socket *sock); |
| |
| // Initiate a new connection on the given socket. |
| static Status StartConnect(Socket *sock, const Sockaddr &remote); |
| |
| // Assign a new outbound call to the appropriate connection object. |
| // If this fails, the call is marked failed and completed. |
| void AssignOutboundCall(std::shared_ptr<OutboundCall> call); |
| |
| // Cancel the outbound call. May update corresponding connection |
| // object to remove call from the CallAwaitingResponse object. |
| // Also mark the call as slated for cancellation so the callback |
| // may be invoked early if the RPC hasn't yet been sent or if it's |
| // waiting for a response from the remote. |
| void CancelOutboundCall(const std::shared_ptr<OutboundCall> &call); |
| |
| // Register a new connection. |
| void RegisterConnection(scoped_refptr<Connection> conn); |
| |
| // Actually perform shutdown of the thread, tearing down any connections, |
| // etc. This is called from within the thread. |
| void ShutdownInternal(); |
| |
| scoped_refptr<kudu::Thread> thread_; |
| |
| // our epoll object (or kqueue, etc). |
| ev::dynamic_loop loop_; |
| |
| // Used by other threads to notify the reactor thread |
| ev::async async_; |
| |
| // Handles the periodic timer. |
| ev::timer timer_; |
| |
| // Scheduled (but not yet run) delayed tasks. |
| // |
| // Each task owns its own memory and must be freed by its TaskRun and |
| // Abort members, provided it was allocated on the heap. |
| boost::intrusive::list<DelayedTask> scheduled_tasks_; |
| |
| // The current monotonic time. Updated every coarse_timer_granularity_secs_. |
| MonoTime cur_time_; |
| |
| // last time we did TCP timeouts. |
| MonoTime last_unused_tcp_scan_; |
| |
| // Map of sockaddrs to Connection objects for outbound (client) connections. |
| conn_multimap_t client_conns_; |
| |
| // List of current connections coming into the server. |
| conn_list_t server_conns_; |
| |
| Reactor *reactor_; |
| |
| // If a connection has been idle for this much time, it is torn down. |
| const MonoDelta connection_keepalive_time_; |
| |
| // Scan for idle connections on this granularity. |
| const MonoDelta coarse_timer_granularity_; |
| |
| // Metrics. |
| scoped_refptr<Histogram> invoke_us_histogram_; |
| scoped_refptr<Histogram> load_percent_histogram_; |
| |
| // Total number of client connections opened during Reactor's lifetime. |
| uint64_t total_client_conns_cnt_; |
| |
| // Total number of server connections opened during Reactor's lifetime. |
| uint64_t total_server_conns_cnt_; |
| |
| // Set prior to calling epoll and then reset back to -1 after each invocation |
| // completes. Used for accounting total_poll_cycles_. |
| int64_t cycle_clock_before_poll_ = -1; |
| |
| // The total number of cycles spent in epoll_wait() since this thread |
| // started. |
| int64_t total_poll_cycles_ = 0; |
| |
| // Random number generator for randomizing the TCP keepalive interval. |
| Random rng_; |
| |
| // Accounting for determining load average in each cycle of TimerHandler. |
| struct { |
| // The cycle-time at which the load average was last calculated. |
| int64_t time_cycles = -1; |
| // The value of total_poll_cycles_ at the last-recorded time. |
| int64_t poll_cycles = -1; |
| } last_load_measurement_; |
| }; |
| |
| // A Reactor manages a ReactorThread |
| class Reactor { |
| public: |
| Reactor(std::shared_ptr<Messenger> messenger, |
| int index, |
| const MessengerBuilder &bld); |
| Status Init(); |
| |
| // Shuts down the reactor and its corresponding thread, optionally waiting |
| // until the thread has exited. |
| void Shutdown(Messenger::ShutdownMode mode); |
| |
| ~Reactor(); |
| |
| const std::string &name() const; |
| |
| // Collect metrics about the reactor. |
| Status GetMetrics(ReactorMetrics *metrics); |
| |
| // Add any connections on this reactor thread into the given status dump. |
| Status DumpConnections(const DumpConnectionsRequestPB& req, |
| DumpConnectionsResponsePB* resp); |
| |
| // Queue a new incoming connection. Takes ownership of the underlying fd from |
| // 'socket', but not the Socket object itself. |
| // If the reactor is already shut down, takes care of closing the socket. |
| void RegisterInboundSocket(Socket *socket, const Sockaddr &remote); |
| |
| // Queue a new call to be sent. If the reactor is already shut down, marks |
| // the call as failed. |
| void QueueOutboundCall(const std::shared_ptr<OutboundCall> &call); |
| |
| // Queue a new reactor task to cancel an outbound call. |
| void QueueCancellation(const std::shared_ptr<OutboundCall> &call); |
| |
| // Schedule the given task's Run() method to be called on the |
| // reactor thread. |
| // If the reactor shuts down before it is run, the Abort method will be |
| // called. |
| // Does _not_ take ownership of 'task' -- the task should take care of |
| // deleting itself after running if it is allocated on the heap. |
| void ScheduleReactorTask(ReactorTask *task); |
| |
| Status RunOnReactorThread(const boost::function<Status()>& f); |
| |
| // If the Reactor is closing, returns false. |
| // Otherwise, drains the pending_tasks_ queue into the provided list. |
| bool DrainTaskQueue(boost::intrusive::list<ReactorTask> *tasks); |
| |
| Messenger *messenger() const { |
| return messenger_.get(); |
| } |
| |
| // Indicates whether the reactor is shutting down. |
| // |
| // This method is thread-safe. |
| bool closing() const; |
| |
| // Is this reactor's thread the current thread? |
| bool IsCurrentThread() const { |
| return thread_.IsCurrentThread(); |
| } |
| |
| private: |
| friend class ReactorThread; |
| typedef simple_spinlock LockType; |
| mutable LockType lock_; |
| |
| // parent messenger |
| std::shared_ptr<Messenger> messenger_; |
| |
| const std::string name_; |
| |
| // Whether the reactor is shutting down. |
| // Guarded by lock_. |
| bool closing_; |
| |
| // Tasks to be run within the reactor thread. |
| // Guarded by lock_. |
| boost::intrusive::list<ReactorTask> pending_tasks_; // NOLINT(build/include_what_you_use) |
| |
| ReactorThread thread_; |
| |
| DISALLOW_COPY_AND_ASSIGN(Reactor); |
| }; |
| |
| } // namespace rpc |
| } // namespace kudu |