/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include "RpcClientImpl.h"

#include <chrono>
#include <functional>
#include <sstream>
#include <thread>

#include "ClientManager.h"
#include "ReceiveMessageStreamReader.h"
#include "RpcClient.h"
#include "TelemetryBidiReactor.h"
#include "TlsHelper.h"
#include "absl/time/time.h"

ROCKETMQ_NAMESPACE_BEGIN

using ClientContext = grpc::ClientContext;

void RpcClientImpl::asyncCallback(std::weak_ptr<RpcClient> client, BaseInvocationContext* invocation_context,
                                  grpc::Status status) {

  invocation_context->status = std::move(status);
  std::shared_ptr<RpcClient> stub = client.lock();
  if (!stub) {
    SPDLOG_WARN("RpcClient has destructed. Response Ignored");
    // TODO: execute orphan callback in event-loop thread?
    // invocation_context->onCompletion(false);
    // or
    delete invocation_context;
    return;
  }

  std::weak_ptr<ClientManager> client_manager = stub->clientManager();
  std::shared_ptr<ClientManager> manager = client_manager.lock();
  if (!manager) {
    SPDLOG_WARN("ClientManager has destructed. Response ignored");
    // TODO: execute orphan callback in event-loop thread?
    // invocation_context->onCompletion(false);
    // or
    delete invocation_context;
  }

  auto task = [invocation_context, client] {
    auto ptr = client.lock();
    if (!ptr) {
      // RPC client should have destructed.
      return;
    }
    invocation_context->onCompletion(invocation_context->status.ok());
  };

  // Execute business post-processing in callback thread pool.
  manager->submit(task);
}

void RpcClientImpl::asyncQueryRoute(const QueryRouteRequest& request,
                                    InvocationContext<QueryRouteResponse>* invocation_context) {
  std::weak_ptr<RpcClient> rpc_client(shared_from_this());
  auto callback = std::bind(&RpcClientImpl::asyncCallback, rpc_client, invocation_context, std::placeholders::_1);
  stub_->async()->QueryRoute(&invocation_context->context, &request, &invocation_context->response, callback);
}

void RpcClientImpl::asyncSend(const SendMessageRequest& request,
                              InvocationContext<SendMessageResponse>* invocation_context) {
  std::weak_ptr<RpcClient> rpc_client(shared_from_this());
  auto callback = std::bind(&RpcClientImpl::asyncCallback, rpc_client, invocation_context, std::placeholders::_1);
  stub_->async()->SendMessage(&invocation_context->context, &request, &invocation_context->response, callback);
}

void RpcClientImpl::asyncQueryAssignment(const QueryAssignmentRequest& request,
                                         InvocationContext<QueryAssignmentResponse>* invocation_context) {
  std::weak_ptr<RpcClient> rpc_client(shared_from_this());
  auto callback = std::bind(&RpcClientImpl::asyncCallback, rpc_client, invocation_context, std::placeholders::_1);
  stub_->async()->QueryAssignment(&invocation_context->context, &request, &invocation_context->response, callback);
}

void RpcClientImpl::asyncReceive(const ReceiveMessageRequest& request, std::unique_ptr<ReceiveMessageContext> context) {
  new ReceiveMessageStreamReader(client_manager_, stub_.get(), peer_address_, request, std::move(context));
}

void RpcClientImpl::asyncAck(const AckMessageRequest& request,
                             InvocationContext<AckMessageResponse>* invocation_context) {
  std::weak_ptr<RpcClient> rpc_client(shared_from_this());
  auto callback = std::bind(&RpcClientImpl::asyncCallback, rpc_client, invocation_context, std::placeholders::_1);
  stub_->async()->AckMessage(&invocation_context->context, &request, &invocation_context->response, callback);
}

void RpcClientImpl::asyncChangeInvisibleDuration(
    const ChangeInvisibleDurationRequest& request,
    InvocationContext<ChangeInvisibleDurationResponse>* invocation_context) {

  std::weak_ptr<RpcClient> rpc_client(shared_from_this());
  auto callback = std::bind(&RpcClientImpl::asyncCallback, rpc_client, invocation_context, std::placeholders::_1);

  stub_->async()->ChangeInvisibleDuration(&invocation_context->context, &request, &invocation_context->response,
                                          callback);
}

void RpcClientImpl::asyncHeartbeat(const HeartbeatRequest& request,
                                   InvocationContext<HeartbeatResponse>* invocation_context) {
  std::weak_ptr<RpcClient> rpc_client(shared_from_this());
  auto callback = std::bind(&RpcClientImpl::asyncCallback, rpc_client, invocation_context, std::placeholders::_1);
  stub_->async()->Heartbeat(&invocation_context->context, &request, &invocation_context->response, callback);
}

void RpcClientImpl::asyncEndTransaction(const EndTransactionRequest& request,
                                        InvocationContext<EndTransactionResponse>* invocation_context) {
  std::weak_ptr<RpcClient> rpc_client(shared_from_this());
  auto callback = std::bind(&RpcClientImpl::asyncCallback, rpc_client, invocation_context, std::placeholders::_1);
  stub_->async()->EndTransaction(&invocation_context->context, &request, &invocation_context->response, callback);
}

bool RpcClientImpl::ok() const {
  return channel_ && grpc_connectivity_state::GRPC_CHANNEL_SHUTDOWN != channel_->GetState(false);
}

void RpcClientImpl::addMetadata(grpc::ClientContext& context,
                                const absl::flat_hash_map<std::string, std::string>& metadata) {
  for (const auto& entry : metadata) {
    context.AddMetadata(entry.first, entry.second);
  }
}

bool RpcClientImpl::needHeartbeat() {
  return need_heartbeat_;
}

void RpcClientImpl::needHeartbeat(bool need_heartbeat) {
  need_heartbeat_ = need_heartbeat;
}

std::shared_ptr<TelemetryBidiReactor> RpcClientImpl::asyncTelemetry(std::weak_ptr<Client> client) {
  return std::make_shared<TelemetryBidiReactor>(client, stub_.get(), peer_address_);
}

grpc::Status RpcClientImpl::notifyClientTermination(grpc::ClientContext* context,
                                                    const NotifyClientTerminationRequest& request,
                                                    NotifyClientTerminationResponse* response) {
  return stub_->NotifyClientTermination(context, request, response);
}

void RpcClientImpl::asyncForwardMessageToDeadLetterQueue(
    const ForwardMessageToDeadLetterQueueRequest& request,
    InvocationContext<ForwardMessageToDeadLetterQueueResponse>* invocation_context) {
  std::weak_ptr<RpcClient> rpc_client(shared_from_this());
  auto callback = std::bind(&RpcClientImpl::asyncCallback, rpc_client, invocation_context, std::placeholders::_1);
  stub_->async()->ForwardMessageToDeadLetterQueue(&invocation_context->context, &request, &invocation_context->response,
                                                  callback);
}

std::weak_ptr<ClientManager> RpcClientImpl::clientManager() {
  return client_manager_;
}

ROCKETMQ_NAMESPACE_END