#include "ClientManagerImpl.h"

#include <atomic>
#include <chrono>
#include <memory>
#include <system_error>
#include <utility>
#include <vector>

#include "ReceiveMessageResult.h"
#include "google/rpc/code.pb.h"

#include "InvocationContext.h"
#include "LogInterceptor.h"
#include "LogInterceptorFactory.h"
#include "LoggerImpl.h"
#include "MessageAccessor.h"
#include "MetadataConstants.h"
#include "MixAll.h"
#include "Partition.h"
#include "Protocol.h"
#include "RpcClient.h"
#include "RpcClientImpl.h"
#include "TlsHelper.h"
#include "UtilAll.h"
#include "grpcpp/create_channel.h"
#include "rocketmq/ErrorCode.h"
#include "rocketmq/MQMessageExt.h"

ROCKETMQ_NAMESPACE_BEGIN

ClientManagerImpl::ClientManagerImpl(std::string resource_namespace)
    : resource_namespace_(std::move(resource_namespace)), state_(State::CREATED),
      completion_queue_(std::make_shared<CompletionQueue>()),
      callback_thread_pool_(absl::make_unique<ThreadPoolImpl>(std::thread::hardware_concurrency())),
      latency_histogram_("Message-Latency", 11) {
  spdlog::set_level(spdlog::level::trace);
  assignLabels(latency_histogram_);
  server_authorization_check_config_ = std::make_shared<grpc::experimental::TlsServerAuthorizationCheckConfig>(
      std::make_shared<TlsServerAuthorizationChecker>());

  // Make use of encryption only at the moment.
  std::vector<grpc::experimental::IdentityKeyCertPair> identity_key_cert_list;
  grpc::experimental::IdentityKeyCertPair pair{};
  pair.private_key = TlsHelper::client_private_key;
  pair.certificate_chain = TlsHelper::client_certificate_chain;

  identity_key_cert_list.emplace_back(pair);
  certificate_provider_ =
      std::make_shared<grpc::experimental::StaticDataCertificateProvider>(TlsHelper::CA, identity_key_cert_list);
  tls_channel_credential_options_.set_server_verification_option(GRPC_TLS_SKIP_ALL_SERVER_VERIFICATION);
  tls_channel_credential_options_.set_certificate_provider(certificate_provider_);
  tls_channel_credential_options_.set_server_authorization_check_config(server_authorization_check_config_);
  tls_channel_credential_options_.watch_root_certs();
  tls_channel_credential_options_.watch_identity_key_cert_pairs();
  channel_credential_ = grpc::experimental::TlsCredentials(tls_channel_credential_options_);


  // Use unlimited receive message size.
  channel_arguments_.SetMaxReceiveMessageSize(-1);

  int max_send_message_size = 1024 * 1024 * 16;
  channel_arguments_.SetMaxSendMessageSize(max_send_message_size);

  /*
   * Keep-alive settings:
   * https://github.com/grpc/grpc/blob/master/doc/keepalive.md
   * Keep-alive ping timeout duration: 3s
   * Keep-alive ping interval, 30s
   */
  channel_arguments_.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 60000);
  channel_arguments_.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 3000);
  channel_arguments_.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
  channel_arguments_.SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0);

  /*
   * If set to zero, disables retry behavior. Otherwise, transparent retries
   * are enabled for all RPCs, and configurable retries are enabled when they
   * are configured via the service config. For details, see:
   *   https://github.com/grpc/proposal/blob/master/A6-client-retries.md
   */
  channel_arguments_.SetInt(GRPC_ARG_ENABLE_RETRIES, 0);

  SPDLOG_INFO("ClientManager[ResourceNamespace={}] created", resource_namespace_);
}

ClientManagerImpl::~ClientManagerImpl() {
  shutdown();
  SPDLOG_INFO("ClientManager[ResourceNamespace={}] destructed", resource_namespace_);
}

void ClientManagerImpl::start() {
  if (State::CREATED != state_.load(std::memory_order_relaxed)) {
    SPDLOG_WARN("Unexpected client instance state: {}", state_.load(std::memory_order_relaxed));
    return;
  }
  state_.store(State::STARTING, std::memory_order_relaxed);

  callback_thread_pool_->start();
  scheduler_.start();

  std::weak_ptr<ClientManagerImpl> client_instance_weak_ptr = shared_from_this();

  auto health_check_functor = [client_instance_weak_ptr]() {
    auto client_instance = client_instance_weak_ptr.lock();
    if (client_instance) {
      client_instance->doHealthCheck();
    }
  };
  health_check_task_id_ = scheduler_.schedule(health_check_functor, HEALTH_CHECK_TASK_NAME, std::chrono::seconds(5),
                                              std::chrono::seconds(5));
  auto heartbeat_functor = [client_instance_weak_ptr]() {
    auto client_instance = client_instance_weak_ptr.lock();
    if (client_instance) {
      client_instance->doHeartbeat();
    }
  };
  heartbeat_task_id_ =
      scheduler_.schedule(heartbeat_functor, HEARTBEAT_TASK_NAME, std::chrono::seconds(1), std::chrono::seconds(10));

  completion_queue_thread_ = std::thread(std::bind(&ClientManagerImpl::pollCompletionQueue, this));

  auto stats_functor_ = [client_instance_weak_ptr]() {
    auto client_instance = client_instance_weak_ptr.lock();
    if (client_instance) {
      client_instance->logStats();
    }
  };
  stats_task_id_ =
      scheduler_.schedule(stats_functor_, STATS_TASK_NAME, std::chrono::seconds(0), std::chrono::seconds(10));
  state_.store(State::STARTED, std::memory_order_relaxed);
}

void ClientManagerImpl::shutdown() {
  SPDLOG_DEBUG("Client instance shutdown");
  if (State::STARTED != state_.load(std::memory_order_relaxed)) {
    SPDLOG_WARN("Unexpected client instance state: {}", state_.load(std::memory_order_relaxed));
    return;
  }
  state_.store(STOPPING, std::memory_order_relaxed);

  callback_thread_pool_->shutdown();

  if (health_check_task_id_) {
    scheduler_.cancel(health_check_task_id_);
  }

  if (heartbeat_task_id_) {
    scheduler_.cancel(heartbeat_task_id_);
  }

  if (stats_task_id_) {
    scheduler_.cancel(stats_task_id_);
  }
  scheduler_.shutdown();

  {
    absl::MutexLock lk(&rpc_clients_mtx_);
    rpc_clients_.clear();
    SPDLOG_DEBUG("CompletionQueue of active clients stopped");
  }

  completion_queue_->Shutdown();
  if (completion_queue_thread_.joinable()) {
    completion_queue_thread_.join();
  }
  SPDLOG_DEBUG("Completion queue thread completes OK");

  state_.store(State::STOPPED, std::memory_order_relaxed);
  SPDLOG_DEBUG("Client instance stopped");
}

void ClientManagerImpl::assignLabels(Histogram& histogram) {
  histogram.labels().emplace_back("[000ms~020ms): ");
  histogram.labels().emplace_back("[020ms~040ms): ");
  histogram.labels().emplace_back("[040ms~060ms): ");
  histogram.labels().emplace_back("[060ms~080ms): ");
  histogram.labels().emplace_back("[080ms~100ms): ");
  histogram.labels().emplace_back("[100ms~120ms): ");
  histogram.labels().emplace_back("[120ms~140ms): ");
  histogram.labels().emplace_back("[140ms~160ms): ");
  histogram.labels().emplace_back("[160ms~180ms): ");
  histogram.labels().emplace_back("[180ms~200ms): ");
  histogram.labels().emplace_back("[200ms~inf): ");
}

void ClientManagerImpl::healthCheck(
    const std::string& target_host, const Metadata& metadata, const HealthCheckRequest& request,
    std::chrono::milliseconds timeout,
    const std::function<void(const std::error_code&, const InvocationContext<HealthCheckResponse>*)>& cb) {
  std::error_code ec;
  auto client = getRpcClient(target_host);
  if (!client) {
    ec = ErrorCode::RequestTimeout;
    cb(ec, nullptr);
    return;
  }

  SPDLOG_DEBUG("Prepare to send health-check to {}. Request: {}", target_host, request.DebugString());

  auto invocation_context = new InvocationContext<HealthCheckResponse>();
  invocation_context->task_name = fmt::format("HealthCheck to {}", target_host);
  invocation_context->remote_address = target_host;
  invocation_context->context.set_deadline(std::chrono::system_clock::now() + timeout);

  for (const auto& entry : metadata) {
    invocation_context->context.AddMetadata(entry.first, entry.second);
  }

  auto callback = [cb](const InvocationContext<HealthCheckResponse>* ctx) {
    std::error_code ec;
    if (!ctx->status.ok()) {
      ec = ErrorCode::RequestTimeout;
      cb(ec, ctx);
      return;
    }

    const auto& common = ctx->response.common();
    switch (common.status().code()) {
      case google::rpc::Code::OK: {
        cb(ec, ctx);
      } break;
      case google::rpc::Code::UNAUTHENTICATED: {
        SPDLOG_WARN("Unauthenticated: {}", common.status().message());
        ec = ErrorCode::Unauthorized;
        cb(ec, ctx);
      } break;
      case google::rpc::Code::PERMISSION_DENIED: {
        SPDLOG_WARN("PermissionDenied: {}", common.status().message());
        ec = ErrorCode::Forbidden;
        cb(ec, ctx);
      } break;
      case google::rpc::Code::INTERNAL: {
        SPDLOG_WARN("InternalServerError: {}", common.status().message());
        ec = ErrorCode::InternalServerError;
        cb(ec, ctx);
      } break;
      default: {
        SPDLOG_WARN("NotImplemented: please upgrade SDK to latest release");
        ec = ErrorCode::NotImplemented;
        cb(ec, ctx);
      } break;
    }
  };

  invocation_context->callback = callback;
  client->asyncHealthCheck(request, invocation_context);
}

void ClientManagerImpl::doHealthCheck() {
  SPDLOG_DEBUG("Start to perform health check for inactive clients");
  if (State::STARTED != state_.load(std::memory_order_relaxed) &&
      State::STARTING != state_.load(std::memory_order_relaxed)) {
    SPDLOG_WARN("Unexpected client instance state={}.", state_.load(std::memory_order_relaxed));
    return;
  }

  auto&& rpc_clients_removed = cleanOfflineRpcClients();

  std::vector<std::shared_ptr<Client>> clients;
  {
    absl::MutexLock lk(&clients_mtx_);
    for (auto& item : clients_) {
      auto client = item.lock();
      if (client && client->active()) {
        clients.emplace_back(std::move(client));
      }
    }
  }

  if (!rpc_clients_removed.empty()) {
    for (auto& client : clients) {
      client->onRemoteEndpointRemoval(rpc_clients_removed);
    }
  }

  for (auto& client : clients) {
    client->healthCheck();
  }
  SPDLOG_DEBUG("Health check completed");
}

std::vector<std::string> ClientManagerImpl::cleanOfflineRpcClients() {
  absl::flat_hash_set<std::string> hosts;
  {
    absl::MutexLock lk(&clients_mtx_);
    for (const auto& item : clients_) {
      std::shared_ptr<Client> client = item.lock();
      if (!client) {
        continue;
      }
      client->endpointsInUse(hosts);
    }
  }

  std::vector<std::string> removed;
  {
    absl::MutexLock lk(&rpc_clients_mtx_);
    for (auto it = rpc_clients_.begin(); it != rpc_clients_.end();) {
      std::string host = it->first;
      if (it->second->needHeartbeat() && !hosts.contains(host)) {
        SPDLOG_INFO("Removed RPC client whose peer is offline. RemoteHost={}", host);
        removed.push_back(host);
        rpc_clients_.erase(it++);
      } else {
        it++;
      }
    }
  }

  return removed;
}

void ClientManagerImpl::heartbeat(const std::string& target_host, const Metadata& metadata,
                                  const HeartbeatRequest& request, std::chrono::milliseconds timeout,
                                  const std::function<void(const std::error_code&, const HeartbeatResponse&)>& cb) {
  SPDLOG_DEBUG("Prepare to send heartbeat to {}. Request: {}", target_host, request.DebugString());
  auto client = getRpcClient(target_host, true);
  auto invocation_context = new InvocationContext<HeartbeatResponse>();
  invocation_context->task_name = fmt::format("Heartbeat to {}", target_host);
  invocation_context->remote_address = target_host;
  for (const auto& item : metadata) {
    invocation_context->context.AddMetadata(item.first, item.second);
  }

  auto callback = [cb](const InvocationContext<HeartbeatResponse>* invocation_context) {
    if (!invocation_context->status.ok()) {
      SPDLOG_WARN("Failed to send heartbeat to target_host={}. gRPC code: {}, message: {}",
                  invocation_context->remote_address, invocation_context->status.error_code(),
                  invocation_context->status.error_message());
      std::error_code ec = ErrorCode::RequestTimeout;
      cb(ec, invocation_context->response);
      return;
    }

    const auto& common = invocation_context->response.common();
    std::error_code ec;
    switch (common.status().code()) {
      case google::rpc::Code::OK: {
        cb(ec, invocation_context->response);
      } break;
      case google::rpc::Code::UNAUTHENTICATED: {
        SPDLOG_WARN("Unauthenticated: {}", common.status().message());
        ec = ErrorCode::Unauthorized;
        cb(ec, invocation_context->response);
      } break;
      case google::rpc::Code::PERMISSION_DENIED: {
        SPDLOG_WARN("PermissionDenied: {}", common.status().message());
        ec = ErrorCode::Forbidden;
        cb(ec, invocation_context->response);
      } break;
      case google::rpc::Code::INVALID_ARGUMENT: {
        SPDLOG_WARN("InvalidArgument: {}", common.status().message());
        ec = ErrorCode::BadRequest;
        cb(ec, invocation_context->response);
      } break;
      case google::rpc::Code::INTERNAL: {
        SPDLOG_WARN("InternalServerError: {}", common.status().message());
        ec = ErrorCode::InternalServerError;
        cb(ec, invocation_context->response);
      } break;
      default: {
        SPDLOG_WARN("NotImplemented: Please upgrade SDK to latest release");
      } break;
    }
  };

  invocation_context->callback = callback;
  invocation_context->context.set_deadline(std::chrono::system_clock::now() + timeout);
  client->asyncHeartbeat(request, invocation_context);
}

void ClientManagerImpl::doHeartbeat() {
  if (State::STARTED != state_.load(std::memory_order_relaxed) &&
      State::STARTING != state_.load(std::memory_order_relaxed)) {
    SPDLOG_WARN("Unexpected client instance state={}.", state_.load(std::memory_order_relaxed));
    return;
  }

  std::vector<std::shared_ptr<Client>> clients;
  {
    absl::MutexLock lk(&clients_mtx_);
    for (const auto& item : clients_) {
      auto client = item.lock();
      if (client && client->active()) {
        clients.emplace_back(std::move(client));
      }
    }
  }

  for (auto& client : clients) {
    client->heartbeat();
  }
}

void ClientManagerImpl::pollCompletionQueue() {
  while (State::STARTED == state_.load(std::memory_order_relaxed) ||
         State::STARTING == state_.load(std::memory_order_relaxed)) {
    bool ok = false;
    void* opaque_invocation_context;
    while (completion_queue_->Next(&opaque_invocation_context, &ok)) {
      auto invocation_context = static_cast<BaseInvocationContext*>(opaque_invocation_context);
      if (!ok) {
        // the call is dead
        SPDLOG_WARN("CompletionQueue#Next assigned ok false, indicating the call is dead");
      }
      auto callback = [invocation_context, ok]() { invocation_context->onCompletion(ok); };
      callback_thread_pool_->submit(callback);
    }
    SPDLOG_INFO("CompletionQueue is fully drained and shut down");
  }
  SPDLOG_INFO("pollCompletionQueue completed and quit");
}

bool ClientManagerImpl::send(const std::string& target_host, const Metadata& metadata, SendMessageRequest& request,
                             SendCallback* cb) {
  assert(cb);
  SPDLOG_DEBUG("Prepare to send message to {} asynchronously", target_host);
  RpcClientSharedPtr client = getRpcClient(target_host);
  // Invocation context will be deleted in its onComplete() method.
  auto invocation_context = new InvocationContext<SendMessageResponse>();
  invocation_context->task_name =
      fmt::format("Send message[] to {}", request.message().system_attribute().message_id(), target_host);
  invocation_context->remote_address = target_host;
  for (const auto& entry : metadata) {
    invocation_context->context.AddMetadata(entry.first, entry.second);
  }

  const std::string& topic = request.message().topic().name();
  std::weak_ptr<ClientManager> client_manager(shared_from_this());
  auto completion_callback = [topic, cb,
                              client_manager](const InvocationContext<SendMessageResponse>* invocation_context) {
    ClientManagerPtr client_manager_ptr = client_manager.lock();
    if (!client_manager_ptr) {
      return;
    }

    if (State::STARTED != client_manager_ptr->state()) {
      // TODO: Would this leak some memroy?
      return;
    }

    const auto& common = invocation_context->response.common();

    if (!invocation_context->status.ok()) {
      SPDLOG_WARN("Failed to send message to {} due to gRPC error. gRPC code: {}, gRPC error message: {}",
                  invocation_context->remote_address, invocation_context->status.error_code(),
                  invocation_context->status.error_message());
      std::error_code ec = ErrorCode::RequestTimeout;
      cb->onFailure(ec);
      return;
    }

    if (invocation_context->status.ok()) {
      switch (invocation_context->response.common().status().code()) {
        case google::rpc::Code::OK: {
          SendResult send_result;
          send_result.setSendStatus(SendStatus::SEND_OK);
          send_result.setMsgId(invocation_context->response.message_id());
          send_result.setTransactionId(invocation_context->response.transaction_id());
          cb->onSuccess(send_result);
        } break;

        case google::rpc::Code::INVALID_ARGUMENT: {
          SPDLOG_WARN("InvalidArgument: {}", common.status().message());
          std::error_code ec = ErrorCode::BadRequest;
          cb->onFailure(ec);
        } break;
        case google::rpc::Code::UNAUTHENTICATED: {
          SPDLOG_WARN("Unauthenticated: {}", common.status().message());
          std::error_code ec = ErrorCode::Unauthorized;
          cb->onFailure(ec);
        } break;
        case google::rpc::Code::PERMISSION_DENIED: {
          SPDLOG_WARN("PermissionDenied: {}", common.status().message());
          std::error_code ec = ErrorCode::Forbidden;
          cb->onFailure(ec);
        } break;
        case google::rpc::Code::INTERNAL: {
          SPDLOG_WARN("InternalServerError: {}", common.status().message());
          std::error_code ec = ErrorCode::InternalServerError;
          cb->onFailure(ec);
        } break;
        default: {
          SPDLOG_WARN("Unsupported status code. Check and upgrade SDK to the latest");
          std::error_code ec = ErrorCode::NotImplemented;
          cb->onFailure(ec);
        } break;
      }
    }
  };

  invocation_context->callback = completion_callback;
  client->asyncSend(request, invocation_context);
  return true;
}

/**
 * @brief Create a gRPC channel to target host.
 *
 * @param target_host
 * @return std::shared_ptr<grpc::Channel>
 */
std::shared_ptr<grpc::Channel> ClientManagerImpl::createChannel(const std::string& target_host) {
  std::vector<std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> interceptor_factories;
  interceptor_factories.emplace_back(absl::make_unique<LogInterceptorFactory>());
  auto channel = grpc::experimental::CreateCustomChannelWithInterceptors(
      target_host, channel_credential_, channel_arguments_, std::move(interceptor_factories));
  return channel;
}

RpcClientSharedPtr ClientManagerImpl::getRpcClient(const std::string& target_host, bool need_heartbeat) {
  std::shared_ptr<RpcClient> client;
  {
    absl::MutexLock lock(&rpc_clients_mtx_);
    auto search = rpc_clients_.find(target_host);
    if (search == rpc_clients_.end() || !search->second->ok()) {
      if (search == rpc_clients_.end()) {
        SPDLOG_INFO("Create a RPC client to {}", target_host.data());
      } else if (!search->second->ok()) {
        SPDLOG_INFO("Prior RPC client to {} is not OK. Re-create one", target_host);
      }
      std::vector<std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> interceptor_factories;
      interceptor_factories.emplace_back(absl::make_unique<LogInterceptorFactory>());
      auto channel = grpc::experimental::CreateCustomChannelWithInterceptors(
          target_host, channel_credential_, channel_arguments_, std::move(interceptor_factories));
      client = std::make_shared<RpcClientImpl>(completion_queue_, channel, need_heartbeat);
      rpc_clients_.insert_or_assign(target_host, client);
    } else {
      client = search->second;
    }
  }

  if (need_heartbeat && !client->needHeartbeat()) {
    client->needHeartbeat(need_heartbeat);
  }

  return client;
}

void ClientManagerImpl::addRpcClient(const std::string& target_host, const RpcClientSharedPtr& client) {
  {
    absl::MutexLock lock(&rpc_clients_mtx_);
    rpc_clients_.insert_or_assign(target_host, client);
  }
}

void ClientManagerImpl::cleanRpcClients() {
  absl::MutexLock lk(&rpc_clients_mtx_);
  rpc_clients_.clear();
}

SendResult ClientManagerImpl::processSendResponse(const MQMessageQueue& message_queue,
                                                  const SendMessageResponse& response) {
  if (google::rpc::Code::OK != response.common().status().code()) {
    THROW_MQ_EXCEPTION(MQClientException, response.common().DebugString(), response.common().status().code());
  }
  SendResult send_result;
  send_result.setSendStatus(SendStatus::SEND_OK);
  send_result.setMsgId(response.message_id());
  send_result.setQueueOffset(-1);
  send_result.setMessageQueue(message_queue);
  send_result.setTransactionId(response.transaction_id());
  return send_result;
}

void ClientManagerImpl::addClientObserver(std::weak_ptr<Client> client) {
  absl::MutexLock lk(&clients_mtx_);
  clients_.emplace_back(std::move(client));
}

void ClientManagerImpl::resolveRoute(const std::string& target_host, const Metadata& metadata,
                                     const QueryRouteRequest& request, std::chrono::milliseconds timeout,
                                     const std::function<void(const std::error_code&, const TopicRouteDataPtr&)>& cb) {

  RpcClientSharedPtr client = getRpcClient(target_host, false);
  if (!client) {
    SPDLOG_WARN("Failed to create RPC client for name server[host={}]", target_host);
    std::error_code ec = ErrorCode::RequestTimeout;
    cb(ec, nullptr);
    return;
  }

  auto invocation_context = new InvocationContext<QueryRouteResponse>();
  invocation_context->task_name = fmt::format("Query route of topic={} from {}", request.topic().name(), target_host);
  invocation_context->remote_address = target_host;
  invocation_context->context.set_deadline(std::chrono::system_clock::now() + timeout);
  for (const auto& item : metadata) {
    invocation_context->context.AddMetadata(item.first, item.second);
  }

  auto callback = [cb](const InvocationContext<QueryRouteResponse>* invocation_context) {
    if (!invocation_context->status.ok()) {
      SPDLOG_WARN("Failed to send query route request to server[host={}]. Reason: {}",
                  invocation_context->remote_address, invocation_context->status.error_message());
      std::error_code ec = ErrorCode::RequestTimeout;
      cb(ec, nullptr);
      return;
    }

    std::error_code ec;
    const auto& common = invocation_context->response.common();
    switch (common.status().code()) {
      case google::rpc::Code::OK: {
        auto& partitions = invocation_context->response.partitions();
        std::vector<Partition> topic_partitions;
        for (const auto& partition : partitions) {
          Topic t(partition.topic().resource_namespace(), partition.topic().name());

          auto& broker = partition.broker();
          AddressScheme scheme = AddressScheme::IPv4;
          switch (broker.endpoints().scheme()) {
            case rmq::AddressScheme::IPv4:
              scheme = AddressScheme::IPv4;
              break;
            case rmq::AddressScheme::IPv6:
              scheme = AddressScheme::IPv6;
              break;
            case rmq::AddressScheme::DOMAIN_NAME:
              scheme = AddressScheme::DOMAIN_NAME;
              break;
            default:
              break;
          }

          std::vector<Address> addresses;
          for (const auto& address : broker.endpoints().addresses()) {
            addresses.emplace_back(Address{address.host(), address.port()});
          }
          ServiceAddress service_address(scheme, addresses);
          Broker b(partition.broker().name(), partition.broker().id(), service_address);

          Permission permission = Permission::READ_WRITE;
          switch (partition.permission()) {
            case rmq::Permission::READ:
              permission = Permission::READ;
              break;

            case rmq::Permission::WRITE:
              permission = Permission::WRITE;
              break;
            case rmq::Permission::READ_WRITE:
              permission = Permission::READ_WRITE;
              break;
            default:
              break;
          }
          Partition topic_partition(t, partition.id(), permission, std::move(b));
          topic_partitions.emplace_back(std::move(topic_partition));
        }
        auto ptr =
            std::make_shared<TopicRouteData>(std::move(topic_partitions), invocation_context->response.DebugString());
        cb(ec, ptr);
      } break;
      case google::rpc::Code::UNAUTHENTICATED: {
        SPDLOG_WARN("Unauthenticated: {}. Host={}", common.status().message(), invocation_context->remote_address);
        ec = ErrorCode::Unauthorized;
        cb(ec, nullptr);
      } break;
      case google::rpc::Code::PERMISSION_DENIED: {
        SPDLOG_WARN("PermissionDenied: {}. Host={}", common.status().message(), invocation_context->remote_address);
        ec = ErrorCode::Forbidden;
        cb(ec, nullptr);
      } break;
      case google::rpc::Code::INVALID_ARGUMENT: {
        SPDLOG_WARN("InvalidArgument: {}. Host={}", common.status().message(), invocation_context->remote_address);
        ec = ErrorCode::BadRequest;
        cb(ec, nullptr);
      } break;
      case google::rpc::Code::NOT_FOUND: {
        SPDLOG_WARN("NotFound: {}. Host={}", common.status().message(), invocation_context->remote_address);
        ec = ErrorCode::NotFound;
        cb(ec, nullptr);
      } break;
      case google::rpc::Code::INTERNAL: {
        SPDLOG_WARN("InternalServerError: {}. Host={}", common.status().message(), invocation_context->remote_address);
        ec = ErrorCode::InternalServerError;
        cb(ec, nullptr);
      } break;
      default: {
        SPDLOG_WARN("NotImplement: Please upgrade to latest SDK release. Host={}", invocation_context->remote_address);
        ec = ErrorCode::NotImplemented;
        cb(ec, nullptr);
      } break;
    }
  };
  invocation_context->callback = callback;
  client->asyncQueryRoute(request, invocation_context);
}

void ClientManagerImpl::queryAssignment(
    const std::string& target, const Metadata& metadata, const QueryAssignmentRequest& request,
    std::chrono::milliseconds timeout,
    const std::function<void(const std::error_code&, const QueryAssignmentResponse&)>& cb) {
  SPDLOG_DEBUG("Prepare to send query assignment request to broker[address={}]", target);
  std::shared_ptr<RpcClient> client = getRpcClient(target);

  auto callback = [&, cb](const InvocationContext<QueryAssignmentResponse>* invocation_context) {
    if (!invocation_context->status.ok()) {
      SPDLOG_WARN("Failed to query assignment. Reason: {}", invocation_context->status.error_message());
      std::error_code ec = ErrorCode::RequestTimeout;
      cb(ec, invocation_context->response);
      return;
    }

    const auto& common = invocation_context->response.common();
    std::error_code ec;
    switch (common.status().code()) {
      case google::rpc::Code::OK: {
        SPDLOG_DEBUG("Query assignment OK");
      } break;
      case google::rpc::Code::UNAUTHENTICATED: {
        SPDLOG_WARN("Unauthenticated: {}, host={}", common.status().message(), invocation_context->remote_address);
        ec = ErrorCode::Unauthorized;
      } break;
      case google::rpc::Code::PERMISSION_DENIED: {
        SPDLOG_WARN("PermissionDenied: {}, host={}", common.status().message(), invocation_context->remote_address);
        ec = ErrorCode::Forbidden;
      } break;
      case google::rpc::Code::INVALID_ARGUMENT: {
        SPDLOG_WARN("InvalidArgument: {}, host={}", common.status().message(), invocation_context->remote_address);
        ec = ErrorCode::BadRequest;
      } break;
      case google::rpc::Code::INTERNAL: {
        SPDLOG_WARN("InternalServerError: {}, host={}", common.status().message(), invocation_context->remote_address);
        ec = ErrorCode::InternalServerError;
      } break;
      default: {
        SPDLOG_WARN("NotImplemented: please upgrade SDK to latest release. Host={}",
                    invocation_context->remote_address);
        ec = ErrorCode::NotImplemented;
      } break;
    }
    cb(ec, invocation_context->response);
  };

  auto invocation_context = new InvocationContext<QueryAssignmentResponse>();
  invocation_context->task_name = fmt::format("QueryAssignment from {}", target);
  invocation_context->remote_address = target;
  for (const auto& item : metadata) {
    invocation_context->context.AddMetadata(item.first, item.second);
  }
  invocation_context->context.set_deadline(std::chrono::system_clock::now() + timeout);
  invocation_context->callback = callback;
  client->asyncQueryAssignment(request, invocation_context);
}

void ClientManagerImpl::receiveMessage(const std::string& target_host, const Metadata& metadata,
                                       const ReceiveMessageRequest& request, std::chrono::milliseconds timeout,
                                       const std::shared_ptr<ReceiveMessageCallback>& cb) {
  SPDLOG_DEBUG("Prepare to receive message from {} asynchronously. Request: {}", target_host, request.DebugString());
  RpcClientSharedPtr client = getRpcClient(target_host);

  auto invocation_context = new InvocationContext<ReceiveMessageResponse>();
  invocation_context->task_name = fmt::format("ReceiveMessage from queue[{}-{}-{}-{}], host={}", request.group().name(),
                                              request.partition().topic().name(), request.partition().broker().name(),
                                              request.partition().id(), target_host);
  invocation_context->remote_address = target_host;
  if (!metadata.empty()) {
    for (const auto& item : metadata) {
      invocation_context->context.AddMetadata(item.first, item.second);
    }
  }
  invocation_context->context.set_deadline(std::chrono::system_clock::now() + timeout);

  auto callback = [this, cb](const InvocationContext<ReceiveMessageResponse>* invocation_context) {
    std::error_code ec;
    ReceiveMessageResult result;

    // Handle network error.
    if (!invocation_context->status.ok()) {
      SPDLOG_WARN("Failed to pop messages through gRPC from {}, gRPC code: {}, gRPC error message: {}",
                  invocation_context->remote_address, invocation_context->status.error_code(),
                  invocation_context->status.error_message());
      ec = ErrorCode::RequestTimeout;
      cb->onCompletion(ec, result);
      return;
    }

    // Handle application layer logic
    result.source_host = invocation_context->remote_address;
    const auto& common = invocation_context->response.common();
    switch (common.status().code()) {
      case google::rpc::Code::OK: {
        SPDLOG_TRACE("ReceivedMessage Resonse: {}, host={}", invocation_context->response.DebugString(),
                     invocation_context->remote_address);
        for (auto& item : invocation_context->response.messages()) {
          MQMessageExt message_ext;
          MessageAccessor::setTargetEndpoint(message_ext, invocation_context->remote_address);
          if (wrapMessage(item, message_ext)) {
            result.messages.emplace_back(message_ext);
          } else {
            SPDLOG_WARN("A message fails to pass body checksum validation. Skip processing it.");
          }
        }
        break;
      }
      case google::rpc::Code::UNAUTHENTICATED: {
        SPDLOG_WARN("Unauthenticated: {}. Host={}", common.status().message(), invocation_context->remote_address);
        ec = ErrorCode::Unauthorized;
      } break;

      case google::rpc::Code::PERMISSION_DENIED: {
        SPDLOG_WARN("PermissionDenied: {}. Host={}", common.status().message(), invocation_context->remote_address);
        ec = ErrorCode::Forbidden;
      } break;

      case google::rpc::Code::INVALID_ARGUMENT: {
        SPDLOG_WARN("InvalidArgument: {}. Host={}", common.status().message(), invocation_context->remote_address);
        ec = ErrorCode::BadRequest;
      } break;

      case google::rpc::Code::DEADLINE_EXCEEDED: {
        SPDLOG_WARN("DeadlineExceeded: {}. Host={}", common.status().message(), invocation_context->remote_address);
        ec = ErrorCode::GatewayTimeout;
      } break;

      case google::rpc::Code::INTERNAL: {
        SPDLOG_WARN("IntervalServerError: {}. Host={}", common.status().message(), invocation_context->remote_address);
        ec = ErrorCode::InternalServerError;
      } break;
      default: {
        SPDLOG_WARN("Unsupported code. Please upgrade to use the latest release. Host={}",
                    invocation_context->remote_address);
        ec = ErrorCode::NotImplemented;
      } break;
    }
    cb->onCompletion(ec, result);
  };
  invocation_context->callback = callback;
  client->asyncReceive(request, invocation_context);
}

State ClientManagerImpl::state() const {
  return state_.load(std::memory_order_relaxed);
}

bool ClientManagerImpl::wrapMessage(const rmq::Message& item, MQMessageExt& message_ext) {
  assert(item.topic().resource_namespace() == resource_namespace_);

  // base
  message_ext.setTopic(item.topic().name());

  const auto& system_attributes = item.system_attribute();

  // Receipt-handle
  MessageAccessor::setReceiptHandle(message_ext, system_attributes.receipt_handle());

  // Tag
  message_ext.setTags(system_attributes.tag());

  // Keys
  std::vector<std::string> keys;
  for (const auto& key : system_attributes.keys()) {
    keys.push_back(key);
  }
  message_ext.setKeys(keys);

  // Message-Id
  MessageAccessor::setMessageId(message_ext, system_attributes.message_id());

  // Validate body digest
  const rmq::Digest& digest = system_attributes.body_digest();
  bool body_digest_match = false;
  if (item.body().empty()) {
    SPDLOG_WARN("Body of message[topic={}, msgId={}] is empty", item.topic().name(),
                item.system_attribute().message_id());
    body_digest_match = true;
  } else {
    switch (digest.type()) {
      case rmq::DigestType::CRC32: {
        std::string checksum;
        bool success = MixAll::crc32(item.body(), checksum);
        if (success) {
          body_digest_match = (digest.checksum() == checksum);
          if (body_digest_match) {
            SPDLOG_DEBUG("Message body CRC32 checksum validation passed.");
          } else {
            SPDLOG_WARN("Body CRC32 checksum validation failed. Actual: {}, expect: {}", checksum, digest.checksum());
          }
        } else {
          SPDLOG_WARN("Failed to calculate CRC32 checksum. Skip.");
        }
        break;
      }
      case rmq::DigestType::MD5: {
        std::string checksum;
        bool success = MixAll::md5(item.body(), checksum);
        if (success) {
          body_digest_match = (digest.checksum() == checksum);
          if (body_digest_match) {
            SPDLOG_DEBUG("Body of message[{}] MD5 checksum validation passed.", message_ext.getMsgId());
          } else {
            SPDLOG_WARN("Body of message[{}] MD5 checksum validation failed. Expect: {}, Actual: {}",
                        message_ext.getMsgId(), digest.checksum(), checksum);
          }
        } else {
          SPDLOG_WARN("Failed to calculate MD5 digest. Skip.");
          body_digest_match = true;
        }
        break;
      }
      case rmq::DigestType::SHA1: {
        std::string checksum;
        bool success = MixAll::sha1(item.body(), checksum);
        if (success) {
          body_digest_match = (checksum == digest.checksum());
          if (body_digest_match) {
            SPDLOG_DEBUG("Body of message[{}] SHA1 checksum validation passed", message_ext.getMsgId());
          } else {
            SPDLOG_WARN("Body of message[{}] SHA1 checksum validation failed. Expect: {}, Actual: {}",
                        message_ext.getMsgId(), digest.checksum(), checksum);
          }
        } else {
          SPDLOG_WARN("Failed to calculate SHA1 digest for message[{}]. Skip.", message_ext.getMsgId());
        }
        break;
      }
      default: {
        SPDLOG_WARN("Unsupported message body digest algorithm");
        body_digest_match = true;
        break;
      }
    }
  }

  if (!body_digest_match) {
    SPDLOG_WARN("Message body checksum failed. MsgId={}", system_attributes.message_id());
    // TODO: NACK it immediately
    return false;
  }

  // Body encoding
  switch (system_attributes.body_encoding()) {
    case rmq::Encoding::GZIP: {
      std::string uncompressed;
      UtilAll::uncompress(item.body(), uncompressed);
      message_ext.setBody(uncompressed);
      break;
    }
    case rmq::Encoding::IDENTITY: {
      message_ext.setBody(item.body());
      break;
    }
    default: {
      SPDLOG_WARN("Unsupported encoding algorithm");
      break;
    }
  }

  timeval tv{};

  // Message-type
  MessageType message_type;
  switch (system_attributes.message_type()) {
    case rmq::MessageType::NORMAL:
      message_type = MessageType::NORMAL;
      break;
    case rmq::MessageType::FIFO:
      message_type = MessageType::FIFO;
      break;
    case rmq::MessageType::DELAY:
      message_type = MessageType::DELAY;
      break;
    case rmq::MessageType::TRANSACTION:
      message_type = MessageType::TRANSACTION;
      break;
    default:
      SPDLOG_WARN("Unknown message type. Treat it as normal message");
      message_type = MessageType::NORMAL;
      break;
  }
  MessageAccessor::setMessageType(message_ext, message_type);

  // Born-timestamp
  if (system_attributes.has_born_timestamp()) {
    tv.tv_sec = system_attributes.born_timestamp().seconds();
    tv.tv_usec = system_attributes.born_timestamp().nanos() / 1000;
    auto born_timestamp = absl::TimeFromTimeval(tv);
    MessageAccessor::setBornTimestamp(message_ext, born_timestamp);
  }

  // Born-host
  MessageAccessor::setBornHost(message_ext, system_attributes.born_host());

  // Store-timestamp
  if (system_attributes.has_store_timestamp()) {
    tv.tv_sec = system_attributes.store_timestamp().seconds();
    tv.tv_usec = system_attributes.store_timestamp().nanos() / 1000;
    MessageAccessor::setStoreTimestamp(message_ext, absl::TimeFromTimeval(tv));
  }

  // Store-host
  MessageAccessor::setStoreHost(message_ext, system_attributes.store_host());

  // Process one-of: delivery-timestamp and delay-level.
  switch (system_attributes.timed_delivery_case()) {
    case rmq::SystemAttribute::TimedDeliveryCase::kDelayLevel: {
      message_ext.setDelayTimeLevel(system_attributes.delay_level());
      break;
    }

    case rmq::SystemAttribute::TimedDeliveryCase::kDeliveryTimestamp: {
      tv.tv_sec = system_attributes.delivery_timestamp().seconds();
      tv.tv_usec = system_attributes.delivery_timestamp().nanos();
      MessageAccessor::setDeliveryTimestamp(message_ext, absl::TimeFromTimeval(tv));
      break;
    }

    default:
      break;
  }

  // Partition-id
  MessageAccessor::setQueueId(message_ext, system_attributes.partition_id());

  // Partition-offset
  MessageAccessor::setQueueOffset(message_ext, system_attributes.partition_offset());

  // Invisible-period
  if (system_attributes.has_invisible_period()) {
    absl::Duration invisible_period = absl::Seconds(system_attributes.invisible_period().seconds()) +
                                      absl::Nanoseconds(system_attributes.invisible_period().nanos());
    MessageAccessor::setInvisiblePeriod(message_ext, invisible_period);
  }

  // Delivery attempt
  MessageAccessor::setDeliveryAttempt(message_ext, system_attributes.delivery_attempt());

  // Trace-context
  MessageAccessor::setTraceContext(message_ext, system_attributes.trace_context());

  // Decoded Time-Point
  MessageAccessor::setDecodedTimestamp(message_ext, absl::Now());

  // User-properties
  std::map<std::string, std::string> properties;
  for (const auto& it : item.user_attribute()) {
    properties.insert(std::make_pair(it.first, it.second));
  }
  message_ext.setProperties(properties);

  // Extension
  {
    auto elapsed = static_cast<int32_t>(absl::ToUnixMillis(absl::Now()) - message_ext.getStoreTimestamp());
    if (elapsed >= 0) {
      latency_histogram_.countIn(elapsed / 20);
    }
  }
  return true;
}

Scheduler& ClientManagerImpl::getScheduler() {
  return scheduler_;
}

void ClientManagerImpl::ack(const std::string& target, const Metadata& metadata, const AckMessageRequest& request,
                            std::chrono::milliseconds timeout, const std::function<void(const std::error_code&)>& cb) {
  std::string target_host(target.data(), target.length());
  SPDLOG_DEBUG("Prepare to ack message against {} asynchronously. AckMessageRequest: {}", target_host,
               request.DebugString());
  RpcClientSharedPtr client = getRpcClient(target_host);

  auto invocation_context = new InvocationContext<AckMessageResponse>();
  invocation_context->task_name = fmt::format("Ack message[{}] against {}", request.message_id(), target);
  invocation_context->remote_address = target_host;
  invocation_context->context.set_deadline(std::chrono::system_clock::now() + timeout);

  for (const auto& item : metadata) {
    invocation_context->context.AddMetadata(item.first, item.second);
  }

  // TODO: Use capture by move and pass-by-value paradigm when C++ 14 is available.
  auto callback = [request, cb](const InvocationContext<AckMessageResponse>* invocation_context) {
    std::error_code ec;
    if (!invocation_context->status.ok()) {
      ec = ErrorCode::RequestTimeout;
      cb(ec);
      return;
    }

    const auto& common = invocation_context->response.common();
    switch (common.status().code()) {
      case google::rpc::Code::OK: {
        SPDLOG_DEBUG("Ack OK. host={}", invocation_context->remote_address);
      } break;
      case google::rpc::Code::UNAUTHENTICATED: {
        SPDLOG_WARN("Unauthenticated: {}, host={}", common.status().message(), invocation_context->remote_address);
        ec = ErrorCode::Unauthorized;
      } break;
      case google::rpc::Code::PERMISSION_DENIED: {
        SPDLOG_WARN("PermissionDenied: {}, host={}", common.status().message(), invocation_context->remote_address);
        ec = ErrorCode::Forbidden;
      } break;
      case google::rpc::Code::INVALID_ARGUMENT: {
        SPDLOG_WARN("InvalidArgument: {}, host={}", common.status().message(), invocation_context->remote_address);
        ec = ErrorCode::BadRequest;
      } break;
      case google::rpc::Code::INTERNAL: {
        SPDLOG_WARN("InternalServerError: {}, host={}", common.status().message(), invocation_context->remote_address);
        ec = ErrorCode::InternalServerError;
      } break;
      default: {
        SPDLOG_WARN("NotImplement: please upgrade SDK to latest release. host={}", invocation_context->remote_address);
        ec = ErrorCode::NotImplemented;
      } break;
    }
    cb(ec);
  };
  invocation_context->callback = callback;
  client->asyncAck(request, invocation_context);
}

void ClientManagerImpl::nack(const std::string& target_host, const Metadata& metadata,
                             const NackMessageRequest& request, std::chrono::milliseconds timeout,
                             const std::function<void(const std::error_code&)>& completion_callback) {
  RpcClientSharedPtr client = getRpcClient(target_host);
  assert(client);
  auto invocation_context = new InvocationContext<NackMessageResponse>();
  invocation_context->task_name = fmt::format("Nack Message[{}] against {}", request.message_id(), target_host);
  invocation_context->remote_address = target_host;
  invocation_context->context.set_deadline(std::chrono::system_clock::now() + timeout);

  for (const auto& item : metadata) {
    invocation_context->context.AddMetadata(item.first, item.second);
  }

  auto callback = [completion_callback](const InvocationContext<NackMessageResponse>* invocation_context) {
    if (!invocation_context->status.ok()) {
      SPDLOG_WARN("Failed to write Nack request to wire. gRPC-code: {}, gRPC-message: {}",
                  invocation_context->status.error_code(), invocation_context->status.error_message());
      std::error_code ec = ErrorCode::RequestTimeout;
      completion_callback(ec);
      return;
    }

    std::error_code ec;
    const auto& common = invocation_context->response.common();
    switch (common.status().code()) {
      case google::rpc::Code::OK: {
        SPDLOG_DEBUG("Nack to {} OK", invocation_context->remote_address);
        break;
      };
      case google::rpc::Code::UNAUTHENTICATED: {
        SPDLOG_WARN("Unauthenticated: {}, host={}", common.status().message(), invocation_context->remote_address);
        ec = ErrorCode::Unauthorized;
        break;
      }
      case google::rpc::Code::PERMISSION_DENIED: {
        SPDLOG_WARN("PermissionDenied: {}, host={}", common.status().message(), invocation_context->remote_address);
        ec = ErrorCode::Forbidden;
        break;
      }
      case google::rpc::Code::INTERNAL: {
        SPDLOG_WARN("InternalServerError: {}, host={}", common.status().message(), invocation_context->remote_address);
        ec = ErrorCode::InternalServerError;
        break;
      }
      default: {
        SPDLOG_WARN("NotImplemented: Please upgrade to latest SDK, host={}", invocation_context->remote_address);
        ec = ErrorCode::NotImplemented;
        break;
      }
    }
    completion_callback(ec);
  };
  invocation_context->callback = callback;
  client->asyncNack(request, invocation_context);
}

void ClientManagerImpl::endTransaction(
    const std::string& target_host, const Metadata& metadata, const EndTransactionRequest& request,
    std::chrono::milliseconds timeout,
    const std::function<void(const std::error_code&, const EndTransactionResponse&)>& cb) {
  RpcClientSharedPtr client = getRpcClient(target_host);
  if (!client) {
    SPDLOG_WARN("No RPC client for {}", target_host);
    EndTransactionResponse response;
    std::error_code ec = ErrorCode::BadRequest;
    cb(ec, response);
    return;
  }

  SPDLOG_DEBUG("Prepare to endTransaction. TargetHost={}, Request: {}", target_host.data(), request.DebugString());

  auto invocation_context = new InvocationContext<EndTransactionResponse>();
  invocation_context->task_name = fmt::format("End transaction[{}] of message[] against {}", request.transaction_id(),
                                              request.message_id(), target_host);
  invocation_context->remote_address = target_host;
  for (const auto& item : metadata) {
    invocation_context->context.AddMetadata(item.first, item.second);
  }

  // Set RPC deadline.
  auto deadline = std::chrono::system_clock::now() + timeout;
  invocation_context->context.set_deadline(deadline);

  auto callback = [target_host, cb](const InvocationContext<EndTransactionResponse>* invocation_context) {
    std::error_code ec;
    if (!invocation_context->status.ok()) {
      SPDLOG_WARN("Failed to write EndTransaction to wire. gRPC-code: {}, gRPC-message: {}, host={}",
                  invocation_context->status.error_code(), invocation_context->status.error_message(),
                  invocation_context->remote_address);
      ec = ErrorCode::BadRequest;
      cb(ec, invocation_context->response);
      return;
    }

    const auto& common = invocation_context->response.common();
    switch (common.status().code()) {
      case google::rpc::Code::OK: {
        SPDLOG_DEBUG("endTransaction completed OK. Response: {}, host={}", invocation_context->response.DebugString(),
                     invocation_context->remote_address);
      } break;
      case google::rpc::Code::UNAUTHENTICATED: {
        SPDLOG_WARN("Unauthenticated: {}, host={}", common.status().message(), invocation_context->remote_address);
        ec = ErrorCode::Unauthorized;
      } break;
      case google::rpc::Code::PERMISSION_DENIED: {
        SPDLOG_WARN("PermissionDenied: {}, host={}", common.status().message(), invocation_context->remote_address);
        ec = ErrorCode::Forbidden;
      } break;
      case google::rpc::INTERNAL: {
        SPDLOG_WARN("InternalServerError: {}, host={}", common.status().message(), invocation_context->remote_address);
        ec = ErrorCode::InternalServerError;
      } break;
      default: {
        SPDLOG_WARN("NotImplemented: please upgrade SDK to latest release. {}, host={}", common.status().message(),
                    invocation_context->remote_address);
        ec = ErrorCode::NotImplemented;
      }
    }
    cb(ec, invocation_context->response);
  };

  invocation_context->callback = callback;
  client->asyncEndTransaction(request, invocation_context);
}

void ClientManagerImpl::multiplexingCall(
    const std::string& target_host, const Metadata& metadata, const MultiplexingRequest& request,
    std::chrono::milliseconds timeout, const std::function<void(const InvocationContext<MultiplexingResponse>*)>& cb) {
  RpcClientSharedPtr client = getRpcClient(target_host);
  if (!client) {
    SPDLOG_WARN("No RPC client for {}", target_host);
    cb(nullptr);
    return;
  }

  SPDLOG_DEBUG("Prepare to endTransaction. TargetHost={}, Request: {}", target_host.data(), request.DebugString());

  auto invocation_context = new InvocationContext<MultiplexingResponse>();
  invocation_context->remote_address = target_host;
  for (const auto& item : metadata) {
    invocation_context->context.AddMetadata(item.first, item.second);
  }

  // Set RPC deadline.
  auto deadline = std::chrono::system_clock::now() + timeout;
  invocation_context->context.set_deadline(deadline);

  auto callback = [cb](const InvocationContext<MultiplexingResponse>* invocation_context) {
    if (!invocation_context->status.ok()) {
      SPDLOG_WARN("Failed to apply multiplexing-call. TargetHost={}, gRPC statusCode={}, errorMessage={}",
                  invocation_context->remote_address, invocation_context->status.error_message(),
                  invocation_context->status.error_message());
      cb(invocation_context);
      return;
    }

    SPDLOG_DEBUG("endTransaction completed OK. Response: {}", invocation_context->response.DebugString());
    cb(invocation_context);
  };
  invocation_context->callback = callback;
  client->asyncMultiplexingCall(request, invocation_context);
}

void ClientManagerImpl::queryOffset(const std::string& target_host, const Metadata& metadata,
                                    const QueryOffsetRequest& request, std::chrono::milliseconds timeout,
                                    const std::function<void(const std::error_code&, const QueryOffsetResponse&)>& cb) {
  auto client = getRpcClient(target_host);
  std::error_code ec;
  if (!client) {
    SPDLOG_WARN("Failed to get/create RPC client for {}", target_host);
    ec = ErrorCode::RequestTimeout;
    QueryOffsetResponse response;
    cb(ec, response);
    return;
  }

  auto invocation_context = new InvocationContext<QueryOffsetResponse>();
  invocation_context->remote_address = target_host;
  invocation_context->context.set_deadline(std::chrono::system_clock::now() + timeout);

  for (const auto& entry : metadata) {
    invocation_context->context.AddMetadata(entry.first, entry.second);
  }

  auto callback = [cb](const InvocationContext<QueryOffsetResponse>* invocation_context) {
    std::error_code ec;

    if (!invocation_context->status.ok()) {
      SPDLOG_WARN("Failed to write QueryOffset request to wire. gRPC-code: {}, gRPC-message: {}, host={}",
                  invocation_context->status.error_code(), invocation_context->status.error_message(),
                  invocation_context->remote_address);
      ec = ErrorCode::RequestTimeout;
      cb(ec, invocation_context->response);
      return;
    }

    const auto& common = invocation_context->response.common();
    switch (common.status().code()) {
      case google::rpc::Code::OK: {
        SPDLOG_DEBUG("Query offset from server[host={}] OK", invocation_context->remote_address);
        cb(ec, invocation_context->response);
      } break;
      case google::rpc::Code::UNAUTHENTICATED: {
        SPDLOG_WARN("Unauthenticated: {}, host={}", common.status().message(), invocation_context->remote_address);
        ec = ErrorCode::Unauthorized;
        cb(ec, invocation_context->response);
      } break;
      case google::rpc::Code::PERMISSION_DENIED: {
        SPDLOG_WARN("PermissionDenied: {}, host={}", common.status().message(), invocation_context->remote_address);
        ec = ErrorCode::Forbidden;
        cb(ec, invocation_context->response);
      } break;
      case google::rpc::Code::INTERNAL: {
        SPDLOG_WARN("InternalServerError: {}, host={}", common.status().message(), invocation_context->remote_address);
        ec = ErrorCode::InternalServerError;
        cb(ec, invocation_context->response);
      } break;
      default: {
        SPDLOG_WARN("NotImplemented: please upgrade SDK to the latest release. host={}",
                    invocation_context->remote_address);
        ec = ErrorCode::NotImplemented;
        cb(ec, invocation_context->response);
      }
    }
  };
  invocation_context->callback = callback;
  client->asyncQueryOffset(request, invocation_context);
}

void ClientManagerImpl::pullMessage(
    const std::string& target_host, const Metadata& metadata, const PullMessageRequest& request,
    std::chrono::milliseconds timeout,
    const std::function<void(const std::error_code&, const ReceiveMessageResult&)>& cb) {
  SPDLOG_DEBUG("PullMessage Request: {}, target_host={}", request.DebugString(), target_host);
  auto client = getRpcClient(target_host);
  auto invocation_context = new InvocationContext<PullMessageResponse>();
  invocation_context->task_name = fmt::format("PullMessage for queue[{}-{}-{}-{}] from {}", request.group().name(),
                                              request.partition().topic().name(), request.partition().broker().name(),
                                              request.partition().id(), target_host);
  invocation_context->remote_address = target_host;
  invocation_context->context.set_deadline(std::chrono::system_clock::now() + timeout);
  for (const auto& item : metadata) {
    invocation_context->context.AddMetadata(item.first, item.second);
  }

  auto callback = [cb, this](const InvocationContext<PullMessageResponse>* invocation_context) {
    std::error_code ec;
    ReceiveMessageResult result;
    result.source_host = invocation_context->remote_address;
    // Handle network issue.
    if (!invocation_context->status.ok()) {
      ec = ErrorCode::RequestTimeout;
      cb(ec, result);
      return;
    }

    // Handle application layer logic: map status::code to corresponding error_code.
    const auto& common = invocation_context->response.common();
    result.min_offset = invocation_context->response.min_offset();
    result.next_offset = invocation_context->response.next_offset();
    result.max_offset = invocation_context->response.max_offset();
    switch (common.status().code()) {
      case google::rpc::Code::OK: {
        SPDLOG_TRACE("Received PullMessage Response: {}, host={}", invocation_context->response.DebugString(),
                     invocation_context->remote_address);
        for (const auto& item : invocation_context->response.messages()) {
          MQMessageExt message;
          if (!wrapMessage(item, message)) {
            return;
          }
          result.messages.emplace_back(message);
        }
      } break;
      case google::rpc::Code::PERMISSION_DENIED: {
        SPDLOG_WARN("PermissionDenied: {}, host={}", common.status().message(), invocation_context->remote_address);
        ec = ErrorCode::Forbidden;
      } break;
      case google::rpc::Code::UNAUTHENTICATED: {
        SPDLOG_WARN("Unauthenticated: {}, host={}", common.status().message(), invocation_context->remote_address);
        ec = ErrorCode::Unauthorized;
      } break;
      case google::rpc::Code::NOT_FOUND: {
        SPDLOG_WARN("NotFound: {}, host={}", common.status().message(), invocation_context->remote_address);
        ec = ErrorCode::NotFound;
        break;
      }
      case google::rpc::Code::DEADLINE_EXCEEDED: {
        SPDLOG_WARN("DeadlineExceeded: {}, host={}", common.status().message(), invocation_context->remote_address);
        ec = ErrorCode::GatewayTimeout;
      } break;
      case google::rpc::Code::INVALID_ARGUMENT: {
        SPDLOG_WARN("InvalidArgument: {}, host={}", common.status().message(), invocation_context->remote_address);
        ec = ErrorCode::BadRequest;
      } break;
      case google::rpc::Code::FAILED_PRECONDITION: {
        SPDLOG_WARN("FailedPrecondition: {}, host={}", common.status().message(), invocation_context->remote_address);
        ec = ErrorCode::PreconditionRequired;
      } break;
      case google::rpc::Code::INTERNAL: {
        SPDLOG_WARN("InternalServerError: {}, host={}", common.status().message(), invocation_context->remote_address);
        ec = ErrorCode::InternalServerError;
      } break;
      default: {
        SPDLOG_WARN("Unimplemented: Please upgrade to use latest SDK release, host={}",
                    invocation_context->remote_address);
        ec = ErrorCode::NotImplemented;
      } break;
    }
    cb(ec, result);
  };

  invocation_context->callback = callback;
  client->asyncPull(request, invocation_context);
}

void ClientManagerImpl::forwardMessageToDeadLetterQueue(
    const std::string& target_host, const Metadata& metadata, const ForwardMessageToDeadLetterQueueRequest& request,
    std::chrono::milliseconds timeout,
    const std::function<void(const InvocationContext<ForwardMessageToDeadLetterQueueResponse>*)>& cb) {
  SPDLOG_DEBUG("ForwardMessageToDeadLetterQueue Request: {}", request.DebugString());
  auto client = getRpcClient(target_host);
  auto invocation_context = new InvocationContext<ForwardMessageToDeadLetterQueueResponse>();
  invocation_context->task_name =
      fmt::format("Forward message[{}] to DLQ against {}", request.message_id(), target_host);
  invocation_context->remote_address = target_host;
  invocation_context->context.set_deadline(std::chrono::system_clock::now() + timeout);

  for (const auto& item : metadata) {
    invocation_context->context.AddMetadata(item.first, item.second);
  }

  auto callback = [cb](const InvocationContext<ForwardMessageToDeadLetterQueueResponse>* invocation_context) {
    if (!invocation_context->status.ok()) {
      SPDLOG_WARN("Failed to transmit SendMessageToDeadLetterQueueRequest to host={}",
                  invocation_context->remote_address);
      cb(invocation_context);
      return;
    }

    SPDLOG_DEBUG("Received forwardToDeadLetterQueue response from server[host={}]", invocation_context->remote_address);
    cb(invocation_context);
  };
  invocation_context->callback = callback;
  client->asyncForwardMessageToDeadLetterQueue(request, invocation_context);
}

std::error_code ClientManagerImpl::notifyClientTermination(const std::string& target_host, const Metadata& metadata,
                                                           const NotifyClientTerminationRequest& request,
                                                           std::chrono::milliseconds timeout) {
  std::error_code ec;
  auto client = getRpcClient(target_host);
  if (!client) {
    SPDLOG_WARN("Failed to create RpcClient for host={}", target_host);
    ec = ErrorCode::RequestTimeout;
    return ec;
  }

  grpc::ClientContext context;
  context.set_deadline(std::chrono::system_clock::now() + timeout);
  for (const auto& item : metadata) {
    context.AddMetadata(item.first, item.second);
  }

  SPDLOG_DEBUG("NotifyClientTermination request: {}", request.DebugString());

  NotifyClientTerminationResponse response;
  grpc::Status status = client->notifyClientTermination(&context, request, &response);
  if (!status.ok()) {
    SPDLOG_WARN("NotifyClientTermination failed. gRPC-code={}, gRPC-message={}, host={}", status.error_code(),
                status.error_message(), target_host);
    ec = ErrorCode::RequestTimeout;
    return ec;
  }

  const auto& common = response.common();

  switch (common.status().code()) {
    case google::rpc::Code::OK: {
      SPDLOG_DEBUG("NotifyClientTermination OK. host={}", target_host);
      break;
    }
    case google::rpc::Code::INTERNAL: {
      SPDLOG_WARN("InternalServerError: Cause={}, host={}", common.status().message(), target_host);
      ec = ErrorCode::InternalServerError;
      break;
    }
    case google::rpc::Code::UNAUTHENTICATED: {
      SPDLOG_WARN("Unauthenticated: Cause={}, host={}", common.status().message(), target_host);
      ec = ErrorCode::Unauthorized;
      break;
    }
    case google::rpc::Code::PERMISSION_DENIED: {
      SPDLOG_WARN("PermissionDenied: Cause={}, host={}", common.status().message(), target_host);
      ec = ErrorCode::Forbidden;
      break;
    }
    default: {
      SPDLOG_WARN("NotImplemented. Please upgrade to latest SDK release. host={}", target_host);
      ec = ErrorCode::NotImplemented;
      break;
    }
  }
  return ec;
}

void ClientManagerImpl::logStats() {
  std::string stats;
  latency_histogram_.reportAndReset(stats);
  SPDLOG_INFO("{}", stats);
}

const char* ClientManagerImpl::HEARTBEAT_TASK_NAME = "heartbeat-task";
const char* ClientManagerImpl::STATS_TASK_NAME = "stats-task";
const char* ClientManagerImpl::HEALTH_CHECK_TASK_NAME = "health-check-task";

ROCKETMQ_NAMESPACE_END