// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once

#include <cstdint>
#include <functional>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>

#include <google/protobuf/message.h>

#include "kudu/gutil/macros.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/rpc/request_tracker.h"
#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/locks.h"
#include "kudu/util/malloc.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/monotime.h"

namespace kudu {

class Status;
class Thread;

namespace rpc {
class RpcContext;

// A ResultTracker for RPC results.
//
// The ResultTracker is responsible for tracking the results of RPCs and making sure that
// client calls with the same client ID and sequence number (first attempt and subsequent retries)
// are executed exactly once.
//
// In most cases, the use of ResultTracker is internal to the RPC system: RPCs are tracked when
// they first arrive, before service methods are called, and calls to ResultTracker to store
// responses are performed internally by RpcContext. The exception is when an RPC is replicated
// across multiple servers, such as with writes, in which case direct interaction with the result
// tracker is required so as to cache responses on replicas which did not receive the RPC directly
// from the client.
//
// Throughout this header and elsewhere we use the following terms:
//
// RPC - The operation that a client or another server wants to execute on this server. The client
//       might attempt one RPC many times, for instance if failures or timeouts happen.
// Attempt - Each individual attempt of an RPC on the server.
// Handler - A thread executing an attempt. Usually there is only one handler that executes the
//           first attempt of an RPC and, when it completes, replies to its own attempt and to all
//           other attempts that might have arrived after it started.
// Driver - Only important in cases where there might be multiple handlers (e.g. in replicated
//          RPCs). In these cases there might be two handlers executing the same RPC, corresponding
//          to different attempts. Since the RPC must be executed exactly once, only one of the
//          handlers must be selected as the "driver" and actually perform the operation.
//
// If a client wishes to track the result of a given RPC it must send on the RPC header
// a RequestId with the following information:
//
//       Client ID - Uniquely identifies a single client. All the RPCs originating from the same
//                   client must have the same ID.
// Sequence number - Uniquely identifies a single RPC, even across retries to multiple servers, for
//                   replicated RPCs. All retries of the same RPC must have the same sequence
//                   number.
//  Attempt number - Uniquely identifies each retry of the same RPC. All retries of the same RPC
//                   must have different attempt numbers.
//
// When a call first arrives from the client the RPC subsystem will call TrackRpc() which
// will return the state of the RPC in the form of an RpcState enum.
//
// If the ResultTracker returns NEW, this signals that it's the first time the server has heard
// of the RPC and that the corresponding server function should be executed.
//
// If anything other than NEW is returned it means that the call has either previously completed or
// is in the process of being executed. In this case the caller should _not_ execute the function
// corresponding to the RPC. The ResultTracker itself will take care of responding to the client
// appropriately. If the RPC was already completed, the ResultTracker replies to the client
// immediately. If the RPC is still ongoing, the attempt gets "attached" to the ongoing one and will
// receive the same response when its handler finishes.
//
// If handling of the RPC is successful, RecordCompletionAndRespond() must be called
// to register successful completion, in which case all pending or future RPCs with the same
// sequence number, from the same client, will receive the same response.
//
// On the other hand, if execution of the server function is not successful then one of
// the FailAndRespond() methods should be called, causing all _pending_ attempts to receive the same
// error. However this error is not stored, any future attempt with the same sequence number and
// same client ID will be given a new chance to execute, as if it it had never been tried before.
// This gives the client a chance to either retry (if the failure reason is transient) or give up.
//
// ============================================================================
// RPCs with multiple handlers
// ============================================================================
//
// Some RPCs results are tracked by single server, i.e. they correspond to the modification of an
// unreplicated resource and are unpersisted. For those no additional care needs to be taken, the
// first attempt will be the only handler, and subsequent attempts will receive the response when
// that first attempt is done.
// However some RPCs are replicated across servers, using consensus, and thus can have multiple
// handlers executing different attempts at the same time, e.g. one handler from a client
// originating retry, and one from a previous leader originating update.
//
// In this case we need to make sure that the following invariants are enforced:
// - Only one handler can actually record a response, the "driver" handler.
// - Only one handler must respond to "attached" attempts.
// - Each handler replies to their own RPCs, to avoid races. That is, a live handler should
//   not mutate another live handler's response/context.
//
// This is achieved by naming one handler the "driver" of the RPC and making sure that only
// the driver can successfully complete it, i.e. call RecordCompletionAndRespond().
//
// In order to make sure there is only one driver, there must be an _external_ serialization
// point, before the final response is produced, after which only one of the handlers will
// be marked as the driver. For instance, for writes, this serialization point is in
// TransactionDriver, in a synchronized block where a logic such as this one happens (here
// in pseudo-ish code):
//
// {
//   lock_guard<simple_spinlock> l(lock_);
//   if (follower_transaction) {
//     result_tracker_->TrackRpcOrChangeDriver(request_id);
//     continue_with_transaction();
//   } else if (client_transaction) {
//     bool is_still_driver = result_tracker_->IsCurrentDriver(request_id);
//     if (is_still_driver) continue_with_transaction();
//     else abort_transaction();
//   }
// }
//
// This class is thread safe.
class ResultTracker : public RefCountedThreadSafe<ResultTracker> {
 public:
  typedef rpc::RequestTracker::SequenceNumber SequenceNumber;
  static const int NO_HANDLER = -1;
  // Enum returned by TrackRpc that reflects the state of the RPC.
  enum RpcState {
    // The RPC is new.
    NEW,
    // The RPC has previously completed and the same response has been sent
    // to the client.
    COMPLETED,
    // The RPC is currently in-progress and, when it completes, the same response
    // will be sent to the client.
    IN_PROGRESS,
    // The RPC's state is stale, meaning it's older than our per-client garbage
    // collection watermark and we do not recall the original response.
    STALE
  };

  explicit ResultTracker(std::shared_ptr<kudu::MemTracker> mem_tracker);
  ~ResultTracker();

  // Tracks the RPC and returns its current state.
  //
  // If the RpcState == NEW the caller is supposed to actually start executing the RPC.
  // The caller still owns the passed 'response' and 'context'.
  //
  // If the RpcState is anything else all remaining actions will be taken care of internally,
  // i.e. the caller no longer needs to execute the RPC and this takes ownership of the passed
  // 'response' and 'context'.
  RpcState TrackRpc(const RequestIdPB& request_id,
                    google::protobuf::Message* response,
                    RpcContext* context);

  // Used to track RPC attempts which originate from other replicas, and which may race with
  // client originated ones.
  // Tracks the RPC if it is untracked or changes the current driver of this RPC, i.e. sets the
  // attempt number in 'request_id' as the driver of the RPC, if it is tracked and IN_PROGRESS.
  RpcState TrackRpcOrChangeDriver(const RequestIdPB& request_id);

  // Checks if the attempt at an RPC identified by 'request_id' is the current driver of the
  // RPC. That is, if the attempt number in 'request_id' corresponds to the attempt marked
  // as the driver of this RPC, either by initially getting NEW from TrackRpc() or by
  // explicit driver change with ChangeDriver().
  bool IsCurrentDriver(const RequestIdPB& request_id);

  // Records the completion of sucessful operation.
  // This will respond to all RPCs from the same client with the same sequence_number.
  // The response will be stored so that any future retries of this RPC get the same response.
  //
  // Requires that TrackRpc() was called before with the same 'client_id' and
  // 'sequence_number'.
  // Requires that the attempt indentified by 'request_id' is the current driver
  // of the RPC.
  void RecordCompletionAndRespond(const RequestIdPB& request_id,
                                  const google::protobuf::Message* response);

  // Responds to all RPCs identified by 'client_id' and 'sequence_number' with the same response,
  // but doesn't actually store the response.
  // This should be called when the RPC failed validation or if some transient error occurred.
  // Based on the response the client can then decide whether to retry the RPC (which will
  // be treated as a new one) or to give up.
  //
  // Requires that TrackRpc() was called before with the same 'client_id' and
  // 'sequence_number'.
  // Requires that the attempt indentified by 'request_id' is the current driver
  // of the RPC.
  void FailAndRespond(const RequestIdPB& request_id,
                      google::protobuf::Message* response);

  // Overload to match other types of RpcContext::Respond*Failure()
  void FailAndRespond(const RequestIdPB& request_id,
                      ErrorStatusPB_RpcErrorCodePB err, const Status& status);

  // Overload to match other types of RpcContext::Respond*Failure()
  void FailAndRespond(const RequestIdPB& request_id,
                      int error_ext_id, const std::string& message,
                      const google::protobuf::Message& app_error_pb);

  // Start a background thread which periodically runs GCResults().
  // This thread is automatically stopped in the destructor.
  //
  // Must be called at most once.
  void StartGCThread();

  // Runs time-based garbage collection on the results this result tracker is caching.
  // When garbage collection runs, it goes through all ClientStates and:
  // - If a ClientState is older than the 'remember_clients_ttl_ms' flag and no
  //   requests are in progress, GCs the ClientState and all its CompletionRecords.
  // - If a ClientState is newer than the 'remember_clients_ttl_ms' flag, goes
  //   through all CompletionRecords and:
  //   - If the CompletionRecord is older than the 'remember_responses_ttl_secs' flag,
  //     GCs the CompletionRecord and advances the 'stale_before_seq_no' watermark.
  //
  // Typically this is invoked from an internal thread started by 'StartGCThread()'.
  void GCResults();

  std::string ToString();

 private:
  // Information about client originated ongoing RPCs.
  // The lifecycle of 'response' and 'context' is managed by the RPC layer.
  struct OnGoingRpcInfo {
    google::protobuf::Message* response;
    RpcContext* context;
    int64_t handler_attempt_no;

    std::string ToString() const;
  };
  // A completion record for an IN_PROGRESS or COMPLETED RPC.
  struct CompletionRecord {
    CompletionRecord(RpcState state, int64_t driver_attempt_no)
        : state(state),
          driver_attempt_no(driver_attempt_no),
          last_updated(MonoTime::Now()) {
    }

    // The current state of the RPC.
    RpcState state;

    // The attempt number that is/was "driving" this RPC.
    int64_t driver_attempt_no;

    // The timestamp of the last CompletionRecord update.
    MonoTime last_updated;

    // The cached response, if this RPC is in COMPLETED state.
    std::unique_ptr<google::protobuf::Message> response;

    // The set of ongoing RPCs that correspond to this record.
    std::vector<OnGoingRpcInfo> ongoing_rpcs;

    std::string ToString() const;

    // Calculates the memory footprint of this struct.
    int64_t memory_footprint() const {
      return kudu_malloc_usable_size(this)
          + (ongoing_rpcs.capacity() > 0 ? kudu_malloc_usable_size(ongoing_rpcs.data()) : 0)
          + (response.get() != nullptr ? response->SpaceUsed() : 0);
    }
  };

  // The state corresponding to a single client.
  struct ClientState {
    typedef MemTrackerAllocator<
        std::pair<const SequenceNumber,
                  std::unique_ptr<CompletionRecord>>> CompletionRecordMapAllocator;
    typedef std::map<SequenceNumber,
                     std::unique_ptr<CompletionRecord>,
                     std::less<SequenceNumber>,
                     CompletionRecordMapAllocator> CompletionRecordMap;

    explicit ClientState(std::shared_ptr<MemTracker> mem_tracker)
        : stale_before_seq_no(0),
          completion_records(CompletionRecordMap::key_compare(),
                             CompletionRecordMapAllocator(std::move(mem_tracker))) {}

    // The last time we've heard from this client.
    MonoTime last_heard_from;

    // The sequence number of the first response we remember for this client.
    // All sequence numbers before this one are considered STALE.
    SequenceNumber stale_before_seq_no;

    // The (un gc'd) CompletionRecords for this client.
    CompletionRecordMap completion_records;

    // Garbage collects this client's CompletionRecords for which MustGcRecordFunc returns
    // true. We use a lambda here so that we can have a single method that GCs and releases
    // the memory for CompletionRecords based on different policies.
    //
    // 'func' should have the following signature:
    //   bool MyFunction(SequenceNumber seq_no, CompletionRecord* record);
    //
    template<class MustGcRecordFunc>
    void GCCompletionRecords(const std::shared_ptr<kudu::MemTracker>& mem_tracker,
                             MustGcRecordFunc func);

    std::string ToString() const;

    // Calculates the memory footprint of this struct.
    // This calculation is shallow and doesn't account for the memory the nested data
    // structures occupy.
    int64_t memory_footprint() const {
      return kudu_malloc_usable_size(this);
    }
  };

  RpcState TrackRpcUnlocked(const RequestIdPB& request_id,
                            google::protobuf::Message* response,
                            RpcContext* context);

  typedef std::function<void (const OnGoingRpcInfo&)> HandleOngoingRpcFunc;

  // Helper method to handle the multiple overloads of FailAndRespond. Takes a lambda
  // that knows what to do with OnGoingRpcInfo in each individual case.
  void FailAndRespondInternal(const rpc::RequestIdPB& request_id,
                              const HandleOngoingRpcFunc& func);

  CompletionRecord* FindCompletionRecordOrNullUnlocked(const RequestIdPB& request_id);
  CompletionRecord* FindCompletionRecordOrDieUnlocked(const RequestIdPB& request_id);
  std::pair<ClientState*, CompletionRecord*> FindClientStateAndCompletionRecordOrNullUnlocked(
      const RequestIdPB& request_id);

  // A handler must handle an RPC attempt if:
  // 1 - It's its own attempt. I.e. it has the same attempt number of the handler.
  // 2 - It's the driver of the RPC and the attempt has no handler (was attached).
  bool MustHandleRpc(int64_t handler_attempt_no,
                     CompletionRecord* completion_record,
                     const OnGoingRpcInfo& ongoing_rpc) {
    if (PREDICT_TRUE(ongoing_rpc.handler_attempt_no == handler_attempt_no)) {
      return true;
    }
    if (completion_record->driver_attempt_no == handler_attempt_no) {
      return ongoing_rpc.handler_attempt_no == NO_HANDLER;
    }
    return false;
  }

  void LogAndTraceAndRespondSuccess(RpcContext* context, const google::protobuf::Message& msg);
  void LogAndTraceFailure(RpcContext* context, const google::protobuf::Message& msg);
  void LogAndTraceFailure(RpcContext* context, ErrorStatusPB_RpcErrorCodePB err,
                          const Status& status);

  std::string ToStringUnlocked() const;

  void RunGCThread();

  // The memory tracker that tracks this ResultTracker's memory consumption.
  std::shared_ptr<kudu::MemTracker> mem_tracker_;

  // Lock that protects access to 'clients_' and to the state contained in each
  // ClientState.
  // TODO consider a per-ClientState lock if we find this too coarse grained.
  simple_spinlock lock_;

  typedef MemTrackerAllocator<std::pair<const std::string,
                                        std::unique_ptr<ClientState>>> ClientStateMapAllocator;
  typedef std::map<std::string,
                   std::unique_ptr<ClientState>,
                   std::less<std::string>,
                   ClientStateMapAllocator> ClientStateMap;

  ClientStateMap clients_;

  // The thread which runs GC, and a latch to stop it.
  scoped_refptr<Thread> gc_thread_;
  CountDownLatch gc_thread_stop_latch_;

  DISALLOW_COPY_AND_ASSIGN(ResultTracker);
};

} // namespace rpc
} // namespace kudu
