blob: 54f33325f8691e25afbfee727fb92bb6f8596308 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef KUDU_RPC_REACTOR_H
#define KUDU_RPC_REACTOR_H
#include <boost/function.hpp>
#include <boost/intrusive/list.hpp>
#include <ev++.h>
#include <list>
#include <map>
#include <memory>
#include <set>
#include <stdint.h>
#include <string>
#include "kudu/gutil/ref_counted.h"
#include "kudu/rpc/connection.h"
#include "kudu/rpc/transfer.h"
#include "kudu/util/thread.h"
#include "kudu/util/locks.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/socket.h"
#include "kudu/util/status.h"
namespace kudu {
namespace rpc {
typedef std::list<scoped_refptr<Connection> > conn_list_t;
class DumpRunningRpcsRequestPB;
class DumpRunningRpcsResponsePB;
class Messenger;
class MessengerBuilder;
class Reactor;
// Simple metrics information from within a reactor.
struct ReactorMetrics {
// Number of client RPC connections currently connected.
int32_t num_client_connections_;
// Number of server RPC connections currently connected.
int32_t num_server_connections_;
};
// A task which can be enqueued to run on the reactor thread.
class ReactorTask : public boost::intrusive::list_base_hook<> {
public:
ReactorTask();
// Run the task. 'reactor' is guaranteed to be the current thread.
virtual void Run(ReactorThread *reactor) = 0;
// Abort the task, in the case that the reactor shut down before the
// task could be processed. This may or may not run on the reactor thread
// itself.
//
// The Reactor guarantees that the Reactor lock is free when this
// method is called.
virtual void Abort(const Status &abort_status) {}
virtual ~ReactorTask();
private:
DISALLOW_COPY_AND_ASSIGN(ReactorTask);
};
// A ReactorTask that is scheduled to run at some point in the future.
//
// Semantically it works like RunFunctionTask with a few key differences:
// 1. The user function is called during Abort. Put another way, the
// user function is _always_ invoked, even during reactor shutdown.
// 2. To differentiate between Abort and non-Abort, the user function
// receives a Status as its first argument.
class DelayedTask : public ReactorTask {
public:
DelayedTask(boost::function<void(const Status &)> func, MonoDelta when);
// Schedules the task for running later but doesn't actually run it yet.
virtual void Run(ReactorThread* reactor) OVERRIDE;
// Behaves like ReactorTask::Abort.
virtual void Abort(const Status& abort_status) OVERRIDE;
private:
// libev callback for when the registered timer fires.
void TimerHandler(ev::timer& watcher, int revents);
// User function to invoke when timer fires or when task is aborted.
const boost::function<void(const Status&)> func_;
// Delay to apply to this task.
const MonoDelta when_;
// Link back to registering reactor thread.
ReactorThread* thread_;
// libev timer. Set when Run() is invoked.
ev::timer timer_;
};
// A ReactorThread is a libev event handler thread which manages I/O
// on a list of sockets.
//
// All methods in this class are _only_ called from the reactor thread itself
// except where otherwise specified. New methods should DCHECK(IsCurrentThread())
// to ensure this.
class ReactorThread {
public:
friend class Connection;
// Client-side connection map.
typedef std::unordered_map<ConnectionId, scoped_refptr<Connection>,
ConnectionIdHash, ConnectionIdEqual> conn_map_t;
ReactorThread(Reactor *reactor, const MessengerBuilder &bld);
// This may be called from another thread.
Status Init();
// Add any connections on this reactor thread into the given status dump.
// May be called from another thread.
Status DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
DumpRunningRpcsResponsePB* resp);
// Block until the Reactor thread is shut down
//
// This must be called from another thread.
void Shutdown();
// This method is thread-safe.
void WakeThread();
// libev callback for handling async notifications in our epoll thread.
void AsyncHandler(ev::async &watcher, int revents);
// libev callback for handling timer events in our epoll thread.
void TimerHandler(ev::timer &watcher, int revents);
// Register an epoll timer watcher with our event loop.
// Does not set a timeout or start it.
void RegisterTimeout(ev::timer *watcher);
// This may be called from another thread.
const std::string &name() const;
MonoTime cur_time() const;
// This may be called from another thread.
Reactor *reactor();
// Return true if this reactor thread is the thread currently
// running. Should be used in DCHECK assertions.
bool IsCurrentThread() const;
// Begin the process of connection negotiation.
// Must be called from the reactor thread.
Status StartConnectionNegotiation(const scoped_refptr<Connection>& conn);
// Transition back from negotiating to processing requests.
// Must be called from the reactor thread.
void CompleteConnectionNegotiation(const scoped_refptr<Connection>& conn,
const Status& status);
// Collect metrics.
// Must be called from the reactor thread.
Status GetMetrics(ReactorMetrics *metrics);
private:
friend class AssignOutboundCallTask;
friend class RegisterConnectionTask;
friend class DelayedTask;
// Run the main event loop of the reactor.
void RunThread();
// Find or create a new connection to the given remote.
// If such a connection already exists, returns that, otherwise creates a new one.
// May return a bad Status if the connect() call fails.
// The resulting connection object is managed internally by the reactor thread.
Status FindOrStartConnection(const ConnectionId& conn_id,
scoped_refptr<Connection>* conn);
// Shut down the given connection, removing it from the connection tracking
// structures of this reactor.
//
// The connection is not explicitly deleted -- shared_ptr reference counting
// may hold on to the object after this, but callers should assume that it
// _may_ be deleted by this call.
void DestroyConnection(Connection *conn, const Status &conn_status);
// Scan any open connections for idle ones that have been idle longer than
// connection_keepalive_time_
void ScanIdleConnections();
// Create a new client socket (non-blocking, NODELAY)
static Status CreateClientSocket(Socket *sock);
// Initiate a new connection on the given socket, setting *in_progress
// to true if the connection is still pending upon return.
static Status StartConnect(Socket *sock, const Sockaddr &remote, bool *in_progress);
// Assign a new outbound call to the appropriate connection object.
// If this fails, the call is marked failed and completed.
void AssignOutboundCall(const std::shared_ptr<OutboundCall> &call);
// Register a new connection.
void RegisterConnection(const scoped_refptr<Connection>& conn);
// Actually perform shutdown of the thread, tearing down any connections,
// etc. This is called from within the thread.
void ShutdownInternal();
scoped_refptr<kudu::Thread> thread_;
// our epoll object (or kqueue, etc).
ev::dynamic_loop loop_;
// Used by other threads to notify the reactor thread
ev::async async_;
// Handles the periodic timer.
ev::timer timer_;
// Scheduled (but not yet run) delayed tasks.
//
// Each task owns its own memory and must be freed by its TaskRun and
// Abort members, provided it was allocated on the heap.
std::set<DelayedTask*> scheduled_tasks_;
// The current monotonic time. Updated every coarse_timer_granularity_secs_.
MonoTime cur_time_;
// last time we did TCP timeouts.
MonoTime last_unused_tcp_scan_;
// Map of sockaddrs to Connection objects for outbound (client) connections.
conn_map_t client_conns_;
// List of current connections coming into the server.
conn_list_t server_conns_;
Reactor *reactor_;
// If a connection has been idle for this much time, it is torn down.
const MonoDelta connection_keepalive_time_;
// Scan for idle connections on this granularity.
const MonoDelta coarse_timer_granularity_;
};
// A Reactor manages a ReactorThread
class Reactor {
public:
Reactor(const std::shared_ptr<Messenger>& messenger,
int index,
const MessengerBuilder &bld);
Status Init();
// Block until the Reactor is shut down
void Shutdown();
~Reactor();
const std::string &name() const;
// Collect metrics about the reactor.
Status GetMetrics(ReactorMetrics *metrics);
// Add any connections on this reactor thread into the given status dump.
Status DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
DumpRunningRpcsResponsePB* resp);
// Queue a new incoming connection. Takes ownership of the underlying fd from
// 'socket', but not the Socket object itself.
// If the reactor is already shut down, takes care of closing the socket.
void RegisterInboundSocket(Socket *socket, const Sockaddr &remote);
// Queue a new call to be sent. If the reactor is already shut down, marks
// the call as failed.
void QueueOutboundCall(const std::shared_ptr<OutboundCall> &call);
// Schedule the given task's Run() method to be called on the
// reactor thread.
// If the reactor shuts down before it is run, the Abort method will be
// called.
// Does _not_ take ownership of 'task' -- the task should take care of
// deleting itself after running if it is allocated on the heap.
void ScheduleReactorTask(ReactorTask *task);
Status RunOnReactorThread(const boost::function<Status()>& f);
// If the Reactor is closing, returns false.
// Otherwise, drains the pending_tasks_ queue into the provided list.
bool DrainTaskQueue(boost::intrusive::list<ReactorTask> *tasks);
Messenger *messenger() const {
return messenger_.get();
}
// Indicates whether the reactor is shutting down.
//
// This method is thread-safe.
bool closing() const;
// Is this reactor's thread the current thread?
bool IsCurrentThread() const {
return thread_.IsCurrentThread();
}
private:
friend class ReactorThread;
typedef simple_spinlock LockType;
mutable LockType lock_;
// parent messenger
std::shared_ptr<Messenger> messenger_;
const std::string name_;
// Whether the reactor is shutting down.
// Guarded by lock_.
bool closing_;
// Tasks to be run within the reactor thread.
// Guarded by lock_.
boost::intrusive::list<ReactorTask> pending_tasks_; // NOLINT(build/include_what_you_use)
ReactorThread thread_;
DISALLOW_COPY_AND_ASSIGN(Reactor);
};
} // namespace rpc
} // namespace kudu
#endif