/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include "RemotingCommand.h"

#include <cstring>  // std::memcpy

#include <algorithm>  // std::move
#include <atomic>     // std::atomic
#include <limits>     // std::numeric_limits

#include "ByteBuffer.hpp"
#include "ByteOrder.h"
#include "Logging.h"
#include "MQVersion.h"
#include "RemotingSerializable.h"

namespace rocketmq {

const int RPC_TYPE = 0;    // 0 - REQUEST_COMMAND; 1 - RESPONSE_COMMAND;
const int RPC_ONEWAY = 1;  // 0 - RPC; 1 - Oneway;

int32_t RemotingCommand::createNewRequestId() {
  static std::atomic<int32_t> sSeqNumber;
  // mask sign bit
  return sSeqNumber.fetch_add(1, std::memory_order_relaxed) & std::numeric_limits<int32_t>::max();
}

RemotingCommand::RemotingCommand(int32_t code, CommandCustomHeader* customHeader)
    : RemotingCommand(code, "", customHeader) {}

RemotingCommand::RemotingCommand(int32_t code, const std::string& remark, CommandCustomHeader* customHeader)
    : RemotingCommand(code,
                      MQVersion::CURRENT_LANGUAGE,
                      MQVersion::CURRENT_VERSION,
                      createNewRequestId(),
                      0,
                      remark,
                      customHeader) {}

RemotingCommand::RemotingCommand(int32_t code,
                                 const std::string& language,
                                 int32_t version,
                                 int32_t opaque,
                                 int32_t flag,
                                 const std::string& remark,
                                 CommandCustomHeader* customHeader)
    : code_(code),
      language_(language),
      version_(version),
      opaque_(opaque),
      flag_(flag),
      remark_(remark),
      custom_header_(customHeader) {}

RemotingCommand::RemotingCommand(RemotingCommand&& command) {
  code_ = command.code_;
  language_ = std::move(command.language_);
  version_ = command.version_;
  opaque_ = command.opaque_;
  flag_ = command.flag_;
  remark_ = std::move(command.remark_);
  ext_fields_ = std::move(command.ext_fields_);
  custom_header_ = std::move(command.custom_header_);
  body_ = std::move(command.body_);
}

RemotingCommand::~RemotingCommand() = default;

ByteArrayRef RemotingCommand::encode() const {
  Json::Value root;
  root["code"] = code_;
  root["language"] = language_;
  root["version"] = version_;
  root["opaque"] = opaque_;
  root["flag"] = flag_;
  root["remark"] = remark_;

  Json::Value ext_fields;
  for (const auto& it : ext_fields_) {
    ext_fields[it.first] = it.second;
  }
  if (custom_header_ != nullptr) {
    // write customHeader to extFields
    custom_header_->Encode(ext_fields);
  }
  root["extFields"] = ext_fields;

  // serialize header
  std::string header = RemotingSerializable::toJson(root);

  // 1> header length size
  uint32_t length = 4;
  // 2> header data length
  length += header.size();
  // 3> body data length
  if (body_ != nullptr) {
    length += body_->size();
  }

  std::unique_ptr<ByteBuffer> result(ByteBuffer::allocate(4 + length));

  // length
  result->putInt(length);
  // header length
  result->putInt((uint32_t)header.size());
  // header data
  result->put(ByteArray((char*)header.data(), header.size()));
  // body data;
  if (body_ != nullptr) {
    result->put(*body_);
  }

  // result->flip();

  return result->byte_array();
}

static inline int32_t getHeaderLength(int32_t length) {
  return length & 0x00FFFFFF;
}

static std::unique_ptr<RemotingCommand> Decode(ByteBuffer& byteBuffer, bool hasPackageLength) {
  // decode package: [4 bytes(packageLength) +] 4 bytes(headerLength) + header + body

  int32_t length = byteBuffer.limit();
  if (hasPackageLength) {
    // skip package length
    (void)byteBuffer.getInt();
    length -= 4;
  }

  // decode header

  int32_t oriHeaderLen = byteBuffer.getInt();
  int32_t headerLength = getHeaderLength(oriHeaderLen);

  // temporary ByteArray
  ByteArray headerData(byteBuffer.array() + byteBuffer.arrayOffset() + byteBuffer.position(), headerLength);
  byteBuffer.position(byteBuffer.position() + headerLength);

  Json::Value object;
  try {
    object = RemotingSerializable::fromJson(headerData);
  } catch (std::exception& e) {
    LOG_WARN_NEW("parse json failed. {}", e.what());
    THROW_MQEXCEPTION(MQClientException, "conn't parse json", -1);
  }

  int32_t code = object["code"].asInt();
  std::string language = object["language"].asString();
  int32_t version = object["version"].asInt();
  int32_t opaque = object["opaque"].asInt();
  int32_t flag = object["flag"].asInt();
  std::string remark;
  if (!object["remark"].isNull()) {
    remark = object["remark"].asString();
  }

  std::unique_ptr<RemotingCommand> cmd(new RemotingCommand(code, language, version, opaque, flag, remark, nullptr));

  if (!object["extFields"].isNull()) {
    auto extFields = object["extFields"];
    for (auto& name : extFields.getMemberNames()) {
      auto& value = extFields[name];
      if (value.isString()) {
        cmd->set_ext_field(name, value.asString());
      }
    }
  }

  // decode body

  int32_t bodyLength = length - 4 - headerLength;
  if (bodyLength > 0) {
    // slice ByteArray of byteBuffer to avoid copy data.
    ByteArrayRef bodyData =
        slice(byteBuffer.byte_array(), byteBuffer.arrayOffset() + byteBuffer.position(), bodyLength);
    byteBuffer.position(byteBuffer.position() + bodyLength);
    cmd->set_body(std::move(bodyData));
  }

  LOG_DEBUG_NEW("code:{}, language:{}, version:{}, opaque:{}, flag:{}, remark:{}, headLen:{}, bodyLen:{}", code,
                language, version, opaque, flag, remark, headerLength, bodyLength);

  return cmd;
}

std::unique_ptr<RemotingCommand> RemotingCommand::Decode(ByteArrayRef array, bool hasPackageLength) {
  std::unique_ptr<ByteBuffer> byteBuffer(ByteBuffer::wrap(std::move(array)));
  return rocketmq::Decode(*byteBuffer, hasPackageLength);
}

bool RemotingCommand::isResponseType() {
  int bits = 1 << RPC_TYPE;
  return (flag_ & bits) == bits;
}

void RemotingCommand::markResponseType() {
  int bits = 1 << RPC_TYPE;
  flag_ |= bits;
}

bool RemotingCommand::isOnewayRPC() {
  int bits = 1 << RPC_ONEWAY;
  return (flag_ & bits) == bits;
}

void RemotingCommand::markOnewayRPC() {
  int bits = 1 << RPC_ONEWAY;
  flag_ |= bits;
}

CommandCustomHeader* RemotingCommand::readCustomHeader() const {
  return custom_header_.get();
}

std::string RemotingCommand::toString() const {
  std::stringstream ss;
  ss << "code:" << code_ << ", opaque:" << opaque_ << ", flag:" << flag_ << ", body.size:" << body_->size();
  return ss.str();
}

}  // namespace rocketmq
