// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once

#include <cstdint>
#include <functional>
#include <list>
#include <memory>
#include <string>
#include <unordered_map>

#include <boost/intrusive/list.hpp>
#include <boost/intrusive/list_hook.hpp>
#include <ev++.h>

#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/rpc/connection.h"
#include "kudu/rpc/connection_id.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/util/locks.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/random.h"
#include "kudu/util/status.h"
#include "kudu/util/thread.h"

namespace kudu {

class Sockaddr;
class Socket;

namespace rpc {

typedef std::list<scoped_refptr<Connection>> conn_list_t;

class DumpConnectionsRequestPB;
class DumpConnectionsResponsePB;
class OutboundCall;
class Reactor;
class ReactorThread;

enum class CredentialsPolicy;

// Simple metrics information from within a reactor.
// TODO(todd): switch these over to use util/metrics.h style metrics.
struct ReactorMetrics {
  // Number of client RPC connections currently connected.
  int32_t num_client_connections_;
  // Number of server RPC connections currently connected.
  int32_t num_server_connections_;

  // Total number of client RPC connections opened during Reactor's lifetime.
  uint64_t total_client_connections_;
  // Total number of server RPC connections opened during Reactor's lifetime.
  uint64_t total_server_connections_;
};

// A task which can be enqueued to run on the reactor thread.
class ReactorTask : public boost::intrusive::list_base_hook<> {
 public:
  ReactorTask();

  // Run the task. 'reactor' is guaranteed to be the current thread.
  virtual void Run(ReactorThread* reactor) = 0;

  // Abort the task, in the case that the reactor shut down before the
  // task could be processed. This may or may not run on the reactor thread
  // itself.
  //
  // The Reactor guarantees that the Reactor lock is free when this
  // method is called.
  virtual void Abort(const Status& abort_status) {}

  virtual ~ReactorTask();

 private:
  DISALLOW_COPY_AND_ASSIGN(ReactorTask);
};

// A ReactorTask that is scheduled to run at some point in the future.
//
// Semantically it works like RunFunctionTask with a few key differences:
// 1. The user function is called during Abort. Put another way, the
//    user function is _always_ invoked, even during reactor shutdown.
// 2. To differentiate between Abort and non-Abort, the user function
//    receives a Status as its first argument.
class DelayedTask : public ReactorTask {
 public:
  DelayedTask(std::function<void(const Status&)> func, MonoDelta when);

  // Schedules the task for running later but doesn't actually run it yet.
  void Run(ReactorThread* thread) override;

  // Behaves like ReactorTask::Abort.
  void Abort(const Status& abort_status) override;

 private:
  // libev callback for when the registered timer fires.
  void TimerHandler(ev::timer& watcher, int revents);

  // User function to invoke when timer fires or when task is aborted.
  const std::function<void(const Status&)> func_;

  // Delay to apply to this task.
  const MonoDelta when_;

  // Link back to registering reactor thread.
  ReactorThread* thread_;

  // libev timer. Set when Run() is invoked.
  ev::timer timer_;
};

// A ReactorThread is a libev event handler thread which manages I/O
// on a list of sockets.
//
// All methods in this class are _only_ called from the reactor thread itself
// except where otherwise specified. New methods should DCHECK(IsCurrentThread())
// to ensure this.
class ReactorThread {
 public:
  friend class Connection;

  // Client-side connection map. Multiple connections could be open to a remote
  // server if multiple credential policies or different network planes are used
  // for individual RPCs.
  typedef std::unordered_multimap<ConnectionId, scoped_refptr<Connection>,
                                  ConnectionIdHash, ConnectionIdEqual>
      conn_multimap_t;

  ReactorThread(Reactor* reactor, const MessengerBuilder& bld);

  // This may be called from another thread.
  Status Init();

  // Add any connections on this reactor thread into the given status dump.
  Status DumpConnections(const DumpConnectionsRequestPB& req,
                         DumpConnectionsResponsePB* resp);

  // Shuts down a reactor thread, optionally waiting for it to exit.
  // Reactor::Shutdown() must have been called already.
  //
  // If mode == SYNC, may not be called from the reactor thread itself.
  void Shutdown(Messenger::ShutdownMode mode);

  // This method is thread-safe.
  void WakeThread();

  // libev callback for handling async notifications in our epoll thread.
  void AsyncHandler(ev::async& watcher, int revents);

  // libev callback for handling timer events in our epoll thread.
  void TimerHandler(ev::timer& watcher, int revents);

  // Register an epoll timer watcher with our event loop.
  // Does not set a timeout or start it.
  void RegisterTimeout(ev::timer* watcher);

  // This may be called from another thread.
  const std::string &name() const;

  MonoTime cur_time() const;

  // This may be called from another thread.
  Reactor *reactor();

  // Return true if this reactor thread is the thread currently
  // running. Should be used in DCHECK assertions.
  bool IsCurrentThread() const;

  // Begin the process of connection negotiation.
  // Must be called from the reactor thread.
  Status StartConnectionNegotiation(const scoped_refptr<Connection>& conn);

  // Transition back from negotiating to processing requests.
  // Must be called from the reactor thread.
  void CompleteConnectionNegotiation(const scoped_refptr<Connection>& conn,
                                     const Status& status,
                                     std::unique_ptr<ErrorStatusPB> rpc_error);

  // Collect metrics.
  // Must be called from the reactor thread.
  Status GetMetrics(ReactorMetrics *metrics);

 private:
  friend class AssignOutboundCallTask;
  friend class CancellationTask;
  friend class RegisterConnectionTask;
  friend class DelayedTask;

  // Run the main event loop of the reactor.
  void RunThread();

  // When libev has noticed that it needs to wake up an application watcher,
  // it calls this callback. The callback simply calls back into libev's
  // ev_invoke_pending() to trigger all the watcher callbacks, but
  // wraps it with latency measurements.
  static void InvokePendingCb(struct ev_loop* loop);

  // Similarly, libev calls these functions before/after invoking epoll_wait().
  // We use these to measure the amount of time spent waiting.
  //
  // NOTE: 'noexcept' is required to avoid compilation errors due to libev's
  // use of the same exception specification.
  static void AboutToPollCb(struct ev_loop* loop) noexcept;
  static void PollCompleteCb(struct ev_loop* loop) noexcept;

  // Find a connection to the given remote and returns it in 'conn'.
  // Returns true if a connection is found. Returns false otherwise.
  bool FindConnection(const ConnectionId& conn_id,
                      CredentialsPolicy cred_policy,
                      scoped_refptr<Connection>* conn);

  // Find or create a new connection to the given remote.
  // If such a connection already exists, returns that, otherwise creates a new one.
  // May return a bad Status if the connect() call fails.
  // The resulting connection object is managed internally by the reactor thread.
  Status FindOrStartConnection(const ConnectionId& conn_id,
                               CredentialsPolicy cred_policy,
                               scoped_refptr<Connection>* conn);

  // Shut down the given connection, removing it from the connection tracking
  // structures of this reactor.
  //
  // The connection is not explicitly deleted -- shared_ptr reference counting
  // may hold on to the object after this, but callers should assume that it
  // _may_ be deleted by this call.
  void DestroyConnection(Connection* conn, const Status& conn_status,
                         std::unique_ptr<ErrorStatusPB> rpc_error = {});

  // Scan any open connections for idle ones that have been idle longer than
  // connection_keepalive_time_. If connection_keepalive_time_ < 0, the scan
  // is skipped.
  void ScanIdleConnections();

  // Create a new client socket (non-blocking, NODELAY)
  static Status CreateClientSocket(int family, Socket* sock);

  // Initiate a new connection on the given socket.
  static Status StartConnect(Socket* sock, const Sockaddr& remote);

  // Assign a new outbound call to the appropriate connection object.
  // If this fails, the call is marked failed and completed.
  void AssignOutboundCall(std::shared_ptr<OutboundCall> call);

  // Cancel the outbound call. May update corresponding connection
  // object to remove call from the CallAwaitingResponse object.
  // Also mark the call as slated for cancellation so the callback
  // may be invoked early if the RPC hasn't yet been sent or if it's
  // waiting for a response from the remote.
  void CancelOutboundCall(const std::shared_ptr<OutboundCall>& call);

  // Register a new connection.
  void RegisterConnection(scoped_refptr<Connection> conn);

  // Actually perform shutdown of the thread, tearing down any connections,
  // etc. This is called from within the thread.
  void ShutdownInternal();

  scoped_refptr<kudu::Thread> thread_;

  // our epoll object (or kqueue, etc).
  ev::dynamic_loop loop_;

  // Used by other threads to notify the reactor thread
  ev::async async_;

  // Handles the periodic timer.
  ev::timer timer_;

  // Scheduled (but not yet run) delayed tasks.
  //
  // Each task owns its own memory and must be freed by its TaskRun and
  // Abort members, provided it was allocated on the heap.
  boost::intrusive::list<DelayedTask> scheduled_tasks_;

  // The current monotonic time.  Updated every coarse_timer_granularity_secs_.
  MonoTime cur_time_;

  // last time we did TCP timeouts.
  MonoTime last_unused_tcp_scan_;

  // Map of sockaddrs to Connection objects for outbound (client) connections.
  conn_multimap_t client_conns_;

  // List of current connections coming into the server.
  conn_list_t server_conns_;

  Reactor *reactor_;

  // If a connection has been idle for this much time, it is torn down.
  const MonoDelta connection_keepalive_time_;

  // Scan for idle connections on this granularity.
  const MonoDelta coarse_timer_granularity_;

  // Metrics.
  scoped_refptr<Histogram> invoke_us_histogram_;
  scoped_refptr<Histogram> load_percent_histogram_;

  // Total number of client connections opened during Reactor's lifetime.
  uint64_t total_client_conns_cnt_;

  // Total number of server connections opened during Reactor's lifetime.
  uint64_t total_server_conns_cnt_;

  // Set prior to calling epoll and then reset back to -1 after each invocation
  // completes. Used for accounting total_poll_cycles_.
  int64_t cycle_clock_before_poll_ = -1;

  // The total number of cycles spent in epoll_wait() since this thread
  // started.
  int64_t total_poll_cycles_ = 0;

  // Random number generator for randomizing the TCP keepalive interval.
  Random rng_;

  // Accounting for determining load average in each cycle of TimerHandler.
  struct {
    // The cycle-time at which the load average was last calculated.
    int64_t time_cycles = -1;
    // The value of total_poll_cycles_ at the last-recorded time.
    int64_t poll_cycles = -1;
  } last_load_measurement_;
};

// A Reactor manages a ReactorThread
class Reactor {
 public:
  Reactor(std::shared_ptr<Messenger> messenger,
          int index,
          const MessengerBuilder& bld);
  Status Init();

  // Shuts down the reactor and its corresponding thread, optionally waiting
  // until the thread has exited.
  void Shutdown(Messenger::ShutdownMode mode);

  ~Reactor();

  const std::string& name() const;

  // Collect metrics about the reactor.
  Status GetMetrics(ReactorMetrics* metrics);

  // Add any connections on this reactor thread into the given status dump.
  Status DumpConnections(const DumpConnectionsRequestPB& req,
                         DumpConnectionsResponsePB* resp);

  // Queue a new incoming connection. Takes ownership of the underlying fd from
  // 'socket', but not the Socket object itself.
  // If the reactor is already shut down, takes care of closing the socket.
  void RegisterInboundSocket(Socket* socket, const Sockaddr& remote);

  // Queue a new call to be sent. If the reactor is already shut down, marks
  // the call as failed.
  void QueueOutboundCall(std::shared_ptr<OutboundCall> call);

  // Queue a new reactor task to cancel an outbound call.
  void QueueCancellation(std::shared_ptr<OutboundCall> call);

  // Schedule the given task's Run() method to be called on the
  // reactor thread.
  // If the reactor shuts down before it is run, the Abort method will be
  // called.
  // Does _not_ take ownership of 'task' -- the task should take care of
  // deleting itself after running if it is allocated on the heap.
  void ScheduleReactorTask(ReactorTask* task);

  Status RunOnReactorThread(std::function<Status()> f);

  // If the Reactor is closing, returns false.
  // Otherwise, drains the pending_tasks_ queue into the provided list.
  bool DrainTaskQueue(boost::intrusive::list<ReactorTask>* tasks);

  Messenger* messenger() const {
    return messenger_.get();
  }

  // Indicates whether the reactor is shutting down.
  //
  // This method is thread-safe.
  bool closing() const;

  // Is this reactor's thread the current thread?
  bool IsCurrentThread() const {
    return thread_.IsCurrentThread();
  }

 private:
  friend class ReactorThread;
  typedef simple_spinlock LockType;
  mutable LockType lock_;

  // parent messenger
  std::shared_ptr<Messenger> messenger_;

  const std::string name_;

  // Whether the reactor is shutting down.
  // Guarded by lock_.
  bool closing_;

  // Tasks to be run within the reactor thread.
  // Guarded by lock_.
  boost::intrusive::list<ReactorTask> pending_tasks_; // NOLINT(build/include_what_you_use)

  ReactorThread thread_;

  DISALLOW_COPY_AND_ASSIGN(Reactor);
};

} // namespace rpc
} // namespace kudu
