#pragma once

#include <atomic>
#include <chrono>
#include <cstdint>
#include <cstdlib>
#include <system_error>

#include "RpcClient.h"
#include "absl/strings/string_view.h"
#include "apache/rocketmq/v1/definition.pb.h"

#include "Client.h"
#include "ClientConfigImpl.h"
#include "ClientManager.h"
#include "ClientResourceBundle.h"
#include "InvocationContext.h"
#include "NameServerResolver.h"
#include "OtlpExporter.h"
#include "rocketmq/MQMessageExt.h"
#include "rocketmq/MessageListener.h"
#include "rocketmq/State.h"

ROCKETMQ_NAMESPACE_BEGIN

class ClientImpl : public ClientConfigImpl, virtual public Client {
public:
  explicit ClientImpl(absl::string_view group_name);

  ~ClientImpl() override = default;

  virtual void start();

  virtual void shutdown();

  void getRouteFor(const std::string& topic, const std::function<void(const std::error_code&, TopicRouteDataPtr)>& cb)
      LOCKS_EXCLUDED(inflight_route_requests_mtx_, topic_route_table_mtx_);

  /**
   * Gather collection of endpoints that are reachable from latest topic route
   * table.
   *
   * @param endpoints
   */
  void endpointsInUse(absl::flat_hash_set<std::string>& endpoints) override LOCKS_EXCLUDED(topic_route_table_mtx_);

  void heartbeat() override;

  bool active() override {
    State state = state_.load(std::memory_order_relaxed);
    return State::STARTING == state || State::STARTED == state;
  }

  void onRemoteEndpointRemoval(const std::vector<std::string>& hosts) override LOCKS_EXCLUDED(isolated_endpoints_mtx_);

  void healthCheck() override LOCKS_EXCLUDED(isolated_endpoints_mtx_);

  void schedule(const std::string& task_name, const std::function<void(void)>& task,
                std::chrono::milliseconds delay) override;

  void withNameServerResolver(std::shared_ptr<NameServerResolver> name_server_resolver) {
    name_server_resolver_ = std::move(name_server_resolver);
  }

  /**
   * Expose for test purpose only.
   */
  void state(State state) {
    state_.store(state, std::memory_order_relaxed);
  }

protected:
  ClientManagerPtr client_manager_;
  std::shared_ptr<OtlpExporter> exporter_;
  std::atomic<State> state_;

  absl::flat_hash_map<std::string, TopicRouteDataPtr> topic_route_table_ GUARDED_BY(topic_route_table_mtx_);
  absl::Mutex topic_route_table_mtx_ ACQUIRED_AFTER(inflight_route_requests_mtx_); // protects topic_route_table_

  absl::flat_hash_map<std::string, std::vector<std::function<void(const std::error_code&, const TopicRouteDataPtr&)>>>
      inflight_route_requests_ GUARDED_BY(inflight_route_requests_mtx_);
  absl::Mutex inflight_route_requests_mtx_ ACQUIRED_BEFORE(topic_route_table_mtx_); // Protects inflight_route_requests_
  static const char* UPDATE_ROUTE_TASK_NAME;
  std::uint32_t route_update_handle_{0};

  // Set Name Server Resolver
  std::shared_ptr<NameServerResolver> name_server_resolver_;

  absl::flat_hash_map<std::string, absl::Time> multiplexing_requests_;
  absl::Mutex multiplexing_requests_mtx_;

  absl::flat_hash_set<std::string> isolated_endpoints_ GUARDED_BY(isolated_endpoints_mtx_);
  absl::Mutex isolated_endpoints_mtx_;

  void updateRouteInfo() LOCKS_EXCLUDED(topic_route_table_mtx_);

  /**
   * Sub-class is supposed to inherit from std::enable_shared_from_this.
   */
  virtual std::shared_ptr<ClientImpl> self() = 0;

  virtual void prepareHeartbeatData(HeartbeatRequest& request) = 0;

  virtual void verifyMessageConsumption(std::string remote_address, std::string command_id, MQMessageExt message);

  /**
   * @brief Execute transaction-state-checker to commit or roll-back the orphan transactional message.
   *
   * It is no-op by default and Producer-subclass is supposed to override it.
   *
   * @param transaction_id
   * @param message
   */
  virtual void resolveOrphanedTransactionalMessage(const std::string& transaction_id, const MQMessageExt& message) {
  }

  /**
   * Concrete publisher/subscriber client is expected to fill other
   * type-specific resources.
   */
  virtual ClientResourceBundle resourceBundle() {
    ClientResourceBundle resource_bundle;
    resource_bundle.client_id = clientId();
    resource_bundle.resource_namespace = resource_namespace_;
    return resource_bundle;
  }

  void setAccessPoint(rmq::Endpoints* endpoints);

  void notifyClientTermination() override;

  void notifyClientTermination(const NotifyClientTerminationRequest& request);

  /**
   * @brief Return application developer provided message listener if this client is of PushConsumer type.
   *
   * By default, it returns nullptr such that error messages are generated and directed to server immediately.
   *
   * @return nullptr by default.
   */
  virtual MessageListener* messageListener() {
    return nullptr;
  }

private:
  /**
   * This is a low-level API that fetches route data from name server through
   * gRPC unary request/response. Once request/response is completed, either
   * timeout or response arrival in time, callback would get invoked.
   * @param topic
   * @param cb
   */
  void fetchRouteFor(const std::string& topic,
                     const std::function<void(const std::error_code&, const TopicRouteDataPtr&)>& cb);

  /**
   * Callback to execute once route data is fetched from name server.
   * @param topic
   * @param route
   */
  void onTopicRouteReady(const std::string& topic, const std::error_code& ec, const TopicRouteDataPtr& route)
      LOCKS_EXCLUDED(inflight_route_requests_mtx_);

  /**
   * Update local cache for the topic. Note, route differences are logged in
   * INFO level since route bears fundamental importance.
   *
   * @param topic
   * @param route
   */
  void updateRouteCache(const std::string& topic, const std::error_code& ec, const TopicRouteDataPtr& route)
      LOCKS_EXCLUDED(topic_route_table_mtx_);

  void pollCommand(const std::string& target);

  void onPollCommandResponse(const InvocationContext<PollCommandResponse>* ctx);

  void onHealthCheckResponse(const std::error_code& endpoint, const InvocationContext<HealthCheckResponse>* ctx)
      LOCKS_EXCLUDED(isolated_endpoints_mtx_);

  void doVerify(std::string target, std::string command_id, MQMessageExt message);
};

ROCKETMQ_NAMESPACE_END