/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
#include "Commands.h"
#include "MessageImpl.h"
#include "VersionInternal.h"
#include "pulsar/MessageBuilder.h"
#include "LogUtils.h"
#include "PulsarApi.pb.h"
#include "Utils.h"
#include "Url.h"
#include <pulsar/Schema.h>
#include "checksum/ChecksumProvider.h"
#include <algorithm>
#include <mutex>

using namespace pulsar;
namespace pulsar {

using namespace pulsar::proto;

DECLARE_LOG_OBJECT();

static inline bool isBuiltInSchema(SchemaType schemaType) {
    switch (schemaType) {
        case STRING:
        case JSON:
        case AVRO:
        case PROTOBUF:
        case PROTOBUF_NATIVE:
            return true;

        default:
            return false;
    }
}

static inline proto::Schema_Type getSchemaType(SchemaType type) {
    switch (type) {
        case SchemaType::NONE:
            return Schema_Type_None;
        case STRING:
            return Schema_Type_String;
        case JSON:
            return Schema_Type_Json;
        case PROTOBUF:
            return Schema_Type_Protobuf;
        case AVRO:
            return Schema_Type_Avro;
        case PROTOBUF_NATIVE:
            return Schema_Type_ProtobufNative;
        default:
            return Schema_Type_None;
    }
}

static proto::Schema* getSchema(const SchemaInfo& schemaInfo) {
    proto::Schema* schema = proto::Schema().New();
    schema->set_name(schemaInfo.getName());
    schema->set_schema_data(schemaInfo.getSchema());
    schema->set_type(getSchemaType(schemaInfo.getSchemaType()));
    for (const auto& kv : schemaInfo.getProperties()) {
        proto::KeyValue* keyValue = proto::KeyValue().New();
        keyValue->set_key(kv.first);
        keyValue->set_value(kv.second);
        schema->mutable_properties()->AddAllocated(keyValue);
    }

    return schema;
}

SharedBuffer Commands::writeMessageWithSize(const BaseCommand& cmd) {
    size_t cmdSize = cmd.ByteSize();
    size_t frameSize = 4 + cmdSize;
    size_t bufferSize = 4 + frameSize;

    SharedBuffer buffer = SharedBuffer::allocate(bufferSize);

    buffer.writeUnsignedInt(frameSize);
    buffer.writeUnsignedInt(cmdSize);
    cmd.SerializeToArray(buffer.mutableData(), cmdSize);
    buffer.bytesWritten(cmdSize);
    return buffer;
}

SharedBuffer Commands::newPartitionMetadataRequest(const std::string& topic, uint64_t requestId) {
    static BaseCommand cmd;
    static std::mutex mutex;
    std::lock_guard<std::mutex> lock(mutex);
    cmd.set_type(BaseCommand::PARTITIONED_METADATA);
    CommandPartitionedTopicMetadata* partitionMetadata = cmd.mutable_partitionmetadata();
    partitionMetadata->set_topic(topic);
    partitionMetadata->set_request_id(requestId);
    const SharedBuffer buffer = writeMessageWithSize(cmd);
    cmd.clear_partitionmetadata();
    return buffer;
}

SharedBuffer Commands::newLookup(const std::string& topic, const bool authoritative, uint64_t requestId,
                                 const std::string& listenerName) {
    static BaseCommand cmd;
    static std::mutex mutex;
    std::lock_guard<std::mutex> lock(mutex);
    cmd.set_type(BaseCommand::LOOKUP);
    CommandLookupTopic* lookup = cmd.mutable_lookuptopic();
    lookup->set_topic(topic);
    lookup->set_authoritative(authoritative);
    lookup->set_request_id(requestId);
    lookup->set_advertised_listener_name(listenerName);
    const SharedBuffer buffer = writeMessageWithSize(cmd);
    cmd.clear_lookuptopic();
    return buffer;
}

SharedBuffer Commands::newConsumerStats(uint64_t consumerId, uint64_t requestId) {
    static BaseCommand cmd;
    static std::mutex mutex;
    std::lock_guard<std::mutex> lock(mutex);
    cmd.set_type(BaseCommand::CONSUMER_STATS);
    CommandConsumerStats* consumerStats = cmd.mutable_consumerstats();
    consumerStats->set_consumer_id(consumerId);
    consumerStats->set_request_id(requestId);
    const SharedBuffer buffer = writeMessageWithSize(cmd);
    cmd.clear_consumerstats();
    return buffer;
}

PairSharedBuffer Commands::newSend(SharedBuffer& headers, BaseCommand& cmd, uint64_t producerId,
                                   uint64_t sequenceId, ChecksumType checksumType, const Message& msg) {
    const proto::MessageMetadata& metadata = msg.impl_->metadata;
    SharedBuffer& payload = msg.impl_->payload;

    cmd.set_type(BaseCommand::SEND);
    CommandSend* send = cmd.mutable_send();
    send->set_producer_id(producerId);
    send->set_sequence_id(sequenceId);
    if (metadata.has_num_messages_in_batch()) {
        send->set_num_messages(metadata.num_messages_in_batch());
    }

    // / Wire format
    // [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]

    int cmdSize = cmd.ByteSize();
    int msgMetadataSize = metadata.ByteSize();
    int payloadSize = payload.readableBytes();

    int magicAndChecksumLength = (Crc32c == (checksumType)) ? (2 + 4 /* magic + checksumLength*/) : 0;
    bool includeChecksum = magicAndChecksumLength > 0;
    int headerContentSize =
        4 + cmdSize + magicAndChecksumLength + 4 + msgMetadataSize;  // cmdLength + cmdSize + magicLength +
    // checksumSize + msgMetadataLength + msgMetadataSize
    int totalSize = headerContentSize + payloadSize;
    int checksumReaderIndex = -1;

    headers.reset();
    assert(headers.writableBytes() >= (4 + headerContentSize));  // totalSize + headerLength
    headers.writeUnsignedInt(totalSize);                         // External frame

    // Write cmd
    headers.writeUnsignedInt(cmdSize);
    cmd.SerializeToArray(headers.mutableData(), cmdSize);
    headers.bytesWritten(cmdSize);

    // Create checksum placeholder
    if (includeChecksum) {
        headers.writeUnsignedShort(magicCrc32c);
        checksumReaderIndex = headers.writerIndex();
        headers.skipBytes(checksumSize);  // skip 4 bytes of checksum
    }

    // Write metadata
    headers.writeUnsignedInt(msgMetadataSize);
    metadata.SerializeToArray(headers.mutableData(), msgMetadataSize);
    headers.bytesWritten(msgMetadataSize);

    PairSharedBuffer composite;
    composite.set(0, headers);
    composite.set(1, payload);

    // Write checksum at created checksum-placeholder
    if (includeChecksum) {
        int writeIndex = headers.writerIndex();
        int metadataStartIndex = checksumReaderIndex + checksumSize;
        uint32_t metadataChecksum =
            computeChecksum(0, headers.data() + metadataStartIndex, (writeIndex - metadataStartIndex));
        uint32_t computedChecksum = computeChecksum(metadataChecksum, payload.data(), payload.writerIndex());
        // set computed checksum
        headers.setWriterIndex(checksumReaderIndex);
        headers.writeUnsignedInt(computedChecksum);
        headers.setWriterIndex(writeIndex);
    }

    cmd.clear_send();
    return composite;
}

SharedBuffer Commands::newConnect(const AuthenticationPtr& authentication, const std::string& logicalAddress,
                                  bool connectingThroughProxy, Result& result) {
    BaseCommand cmd;
    cmd.set_type(BaseCommand::CONNECT);
    CommandConnect* connect = cmd.mutable_connect();
    connect->set_client_version(_PULSAR_VERSION_INTERNAL_);
    connect->set_auth_method_name(authentication->getAuthMethodName());
    connect->set_protocol_version(ProtocolVersion_MAX);

    FeatureFlags* flags = connect->mutable_feature_flags();
    flags->set_supports_auth_refresh(true);
    if (connectingThroughProxy) {
        Url logicalAddressUrl;
        Url::parse(logicalAddress, logicalAddressUrl);
        connect->set_proxy_to_broker_url(logicalAddressUrl.hostPort());
    }

    AuthenticationDataPtr authDataContent;
    result = authentication->getAuthData(authDataContent);
    if (result != ResultOk) {
        return SharedBuffer{};
    }

    if (authDataContent->hasDataFromCommand()) {
        connect->set_auth_data(authDataContent->getCommandData());
    }
    return writeMessageWithSize(cmd);
}

SharedBuffer Commands::newAuthResponse(const AuthenticationPtr& authentication, Result& result) {
    BaseCommand cmd;
    cmd.set_type(BaseCommand::AUTH_RESPONSE);
    CommandAuthResponse* authResponse = cmd.mutable_authresponse();
    authResponse->set_client_version(_PULSAR_VERSION_INTERNAL_);

    AuthData* authData = authResponse->mutable_response();
    authData->set_auth_method_name(authentication->getAuthMethodName());

    AuthenticationDataPtr authDataContent;
    result = authentication->getAuthData(authDataContent);
    if (result != ResultOk) {
        return SharedBuffer{};
    }

    if (authDataContent->hasDataFromCommand()) {
        authData->set_auth_data(authDataContent->getCommandData());
    }

    return writeMessageWithSize(cmd);
}

SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string& subscription,
                                    uint64_t consumerId, uint64_t requestId, CommandSubscribe_SubType subType,
                                    const std::string& consumerName, SubscriptionMode subscriptionMode,
                                    Optional<MessageId> startMessageId, bool readCompacted,
                                    const std::map<std::string, std::string>& metadata,
                                    const SchemaInfo& schemaInfo,
                                    CommandSubscribe_InitialPosition subscriptionInitialPosition,
                                    bool replicateSubscriptionState, KeySharedPolicy keySharedPolicy,
                                    int priorityLevel) {
    BaseCommand cmd;
    cmd.set_type(BaseCommand::SUBSCRIBE);
    CommandSubscribe* subscribe = cmd.mutable_subscribe();
    subscribe->set_topic(topic);
    subscribe->set_subscription(subscription);
    subscribe->set_subtype(subType);
    subscribe->set_consumer_id(consumerId);
    subscribe->set_request_id(requestId);
    subscribe->set_consumer_name(consumerName);
    subscribe->set_durable(subscriptionMode == SubscriptionModeDurable);
    subscribe->set_read_compacted(readCompacted);
    subscribe->set_initialposition(subscriptionInitialPosition);
    subscribe->set_replicate_subscription_state(replicateSubscriptionState);
    subscribe->set_priority_level(priorityLevel);

    if (isBuiltInSchema(schemaInfo.getSchemaType())) {
        subscribe->set_allocated_schema(getSchema(schemaInfo));
    }

    if (startMessageId.is_present()) {
        MessageIdData& messageIdData = *subscribe->mutable_start_message_id();
        messageIdData.set_ledgerid(startMessageId.value().ledgerId());
        messageIdData.set_entryid(startMessageId.value().entryId());

        if (startMessageId.value().batchIndex() != -1) {
            messageIdData.set_batch_index(startMessageId.value().batchIndex());
        }
    }
    for (std::map<std::string, std::string>::const_iterator it = metadata.begin(); it != metadata.end();
         it++) {
        proto::KeyValue* keyValue = proto::KeyValue().New();
        keyValue->set_key(it->first);
        keyValue->set_value(it->second);
        subscribe->mutable_metadata()->AddAllocated(keyValue);
    }

    if (subType == CommandSubscribe_SubType_Key_Shared) {
        KeySharedMeta& ksm = *subscribe->mutable_keysharedmeta();
        switch (keySharedPolicy.getKeySharedMode()) {
            case pulsar::AUTO_SPLIT:
                ksm.set_keysharedmode(proto::KeySharedMode::AUTO_SPLIT);
                break;
            case pulsar::STICKY:
                ksm.set_keysharedmode(proto::KeySharedMode::STICKY);
                for (StickyRange range : keySharedPolicy.getStickyRanges()) {
                    IntRange* intRange = IntRange().New();
                    intRange->set_start(range.first);
                    intRange->set_end(range.second);
                    ksm.mutable_hashranges()->AddAllocated(intRange);
                }
        }
        ksm.set_allowoutoforderdelivery(keySharedPolicy.isAllowOutOfOrderDelivery());
    }

    return writeMessageWithSize(cmd);
}

SharedBuffer Commands::newUnsubscribe(uint64_t consumerId, uint64_t requestId) {
    BaseCommand cmd;
    cmd.set_type(BaseCommand::UNSUBSCRIBE);
    CommandUnsubscribe* unsubscribe = cmd.mutable_unsubscribe();
    unsubscribe->set_consumer_id(consumerId);
    unsubscribe->set_request_id(requestId);

    return writeMessageWithSize(cmd);
}

SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId,
                                   const std::string& producerName, uint64_t requestId,
                                   const std::map<std::string, std::string>& metadata,
                                   const SchemaInfo& schemaInfo, uint64_t epoch,
                                   bool userProvidedProducerName, bool encrypted) {
    BaseCommand cmd;
    cmd.set_type(BaseCommand::PRODUCER);
    CommandProducer* producer = cmd.mutable_producer();
    producer->set_topic(topic);
    producer->set_producer_id(producerId);
    producer->set_request_id(requestId);
    producer->set_epoch(epoch);
    producer->set_user_provided_producer_name(userProvidedProducerName);
    producer->set_encrypted(encrypted);

    for (std::map<std::string, std::string>::const_iterator it = metadata.begin(); it != metadata.end();
         it++) {
        proto::KeyValue* keyValue = proto::KeyValue().New();
        keyValue->set_key(it->first);
        keyValue->set_value(it->second);
        producer->mutable_metadata()->AddAllocated(keyValue);
    }

    if (isBuiltInSchema(schemaInfo.getSchemaType())) {
        producer->set_allocated_schema(getSchema(schemaInfo));
    }

    if (!producerName.empty()) {
        producer->set_producer_name(producerName);
    }

    return writeMessageWithSize(cmd);
}

SharedBuffer Commands::newAck(uint64_t consumerId, const MessageIdData& messageId, CommandAck_AckType ackType,
                              int validationError) {
    BaseCommand cmd;
    cmd.set_type(BaseCommand::ACK);
    CommandAck* ack = cmd.mutable_ack();
    ack->set_consumer_id(consumerId);
    ack->set_ack_type(ackType);
    if (CommandAck_AckType_IsValid(validationError)) {
        ack->set_validation_error((CommandAck_ValidationError)validationError);
    }
    *(ack->add_message_id()) = messageId;
    return writeMessageWithSize(cmd);
}

SharedBuffer Commands::newMultiMessageAck(uint64_t consumerId, const std::set<MessageId>& msgIds) {
    BaseCommand cmd;
    cmd.set_type(BaseCommand::ACK);
    CommandAck* ack = cmd.mutable_ack();
    ack->set_consumer_id(consumerId);
    ack->set_ack_type(CommandAck_AckType_Individual);
    for (const auto& msgId : msgIds) {
        auto newMsgId = ack->add_message_id();
        newMsgId->set_ledgerid(msgId.ledgerId());
        newMsgId->set_entryid(msgId.entryId());
    }
    return writeMessageWithSize(cmd);
}

SharedBuffer Commands::newFlow(uint64_t consumerId, uint32_t messagePermits) {
    BaseCommand cmd;
    cmd.set_type(BaseCommand::FLOW);
    CommandFlow* flow = cmd.mutable_flow();
    flow->set_consumer_id(consumerId);
    flow->set_messagepermits(messagePermits);
    return writeMessageWithSize(cmd);
}

SharedBuffer Commands::newCloseProducer(uint64_t producerId, uint64_t requestId) {
    BaseCommand cmd;
    cmd.set_type(BaseCommand::CLOSE_PRODUCER);
    CommandCloseProducer* close = cmd.mutable_close_producer();
    close->set_producer_id(producerId);
    close->set_request_id(requestId);
    return writeMessageWithSize(cmd);
}

SharedBuffer Commands::newCloseConsumer(uint64_t consumerId, uint64_t requestId) {
    BaseCommand cmd;
    cmd.set_type(BaseCommand::CLOSE_CONSUMER);
    CommandCloseConsumer* close = cmd.mutable_close_consumer();
    close->set_consumer_id(consumerId);
    close->set_request_id(requestId);
    return writeMessageWithSize(cmd);
}

SharedBuffer Commands::newPing() {
    BaseCommand cmd;
    cmd.set_type(BaseCommand::PING);
    cmd.mutable_ping();
    return writeMessageWithSize(cmd);
}

SharedBuffer Commands::newPong() {
    BaseCommand cmd;
    cmd.set_type(BaseCommand::PONG);
    cmd.mutable_pong();
    return writeMessageWithSize(cmd);
}

SharedBuffer Commands::newRedeliverUnacknowledgedMessages(uint64_t consumerId,
                                                          const std::set<MessageId>& messageIds) {
    BaseCommand cmd;
    cmd.set_type(BaseCommand::REDELIVER_UNACKNOWLEDGED_MESSAGES);
    CommandRedeliverUnacknowledgedMessages* command = cmd.mutable_redeliverunacknowledgedmessages();
    command->set_consumer_id(consumerId);
    for (const auto& msgId : messageIds) {
        MessageIdData* msgIdData = command->add_message_ids();
        msgIdData->set_ledgerid(msgId.ledgerId());
        msgIdData->set_entryid(msgId.entryId());
    }
    return writeMessageWithSize(cmd);
}

SharedBuffer Commands::newSeek(uint64_t consumerId, uint64_t requestId, const MessageId& messageId) {
    BaseCommand cmd;
    cmd.set_type(BaseCommand::SEEK);
    CommandSeek* commandSeek = cmd.mutable_seek();
    commandSeek->set_consumer_id(consumerId);
    commandSeek->set_request_id(requestId);

    MessageIdData& messageIdData = *commandSeek->mutable_message_id();
    messageIdData.set_ledgerid(messageId.ledgerId());
    messageIdData.set_entryid(messageId.entryId());
    return writeMessageWithSize(cmd);
}

SharedBuffer Commands::newSeek(uint64_t consumerId, uint64_t requestId, uint64_t timestamp) {
    BaseCommand cmd;
    cmd.set_type(BaseCommand::SEEK);
    CommandSeek* commandSeek = cmd.mutable_seek();
    commandSeek->set_consumer_id(consumerId);
    commandSeek->set_request_id(requestId);
    commandSeek->set_message_publish_time(timestamp);
    return writeMessageWithSize(cmd);
}

SharedBuffer Commands::newGetLastMessageId(uint64_t consumerId, uint64_t requestId) {
    BaseCommand cmd;
    cmd.set_type(BaseCommand::GET_LAST_MESSAGE_ID);

    CommandGetLastMessageId* getLastMessageId = cmd.mutable_getlastmessageid();
    getLastMessageId->set_consumer_id(consumerId);
    getLastMessageId->set_request_id(requestId);
    const SharedBuffer buffer = writeMessageWithSize(cmd);
    cmd.clear_getlastmessageid();
    return buffer;
}

SharedBuffer Commands::newGetTopicsOfNamespace(const std::string& nsName, uint64_t requestId) {
    BaseCommand cmd;
    cmd.set_type(BaseCommand::GET_TOPICS_OF_NAMESPACE);
    CommandGetTopicsOfNamespace* getTopics = cmd.mutable_gettopicsofnamespace();
    getTopics->set_request_id(requestId);
    getTopics->set_namespace_(nsName);

    const SharedBuffer buffer = writeMessageWithSize(cmd);
    cmd.clear_gettopicsofnamespace();
    return buffer;
}

std::string Commands::messageType(BaseCommand_Type type) {
    switch (type) {
        case BaseCommand::CONNECT:
            return "CONNECT";
            break;
        case BaseCommand::CONNECTED:
            return "CONNECTED";
            break;
        case BaseCommand::SUBSCRIBE:
            return "SUBSCRIBE";
            break;
        case BaseCommand::PRODUCER:
            return "PRODUCER";
            break;
        case BaseCommand::SEND:
            return "SEND";
            break;
        case BaseCommand::SEND_RECEIPT:
            return "SEND_RECEIPT";
            break;
        case BaseCommand::SEND_ERROR:
            return "SEND_ERROR";
            break;
        case BaseCommand::MESSAGE:
            return "MESSAGE";
            break;
        case BaseCommand::ACK:
            return "ACK";
            break;
        case BaseCommand::FLOW:
            return "FLOW";
            break;
        case BaseCommand::UNSUBSCRIBE:
            return "UNSUBSCRIBE";
            break;
        case BaseCommand::SUCCESS:
            return "SUCCESS";
            break;
        case BaseCommand::ERROR:
            return "ERROR";
            break;
        case BaseCommand::CLOSE_PRODUCER:
            return "CLOSE_PRODUCER";
            break;
        case BaseCommand::CLOSE_CONSUMER:
            return "CLOSE_CONSUMER";
            break;
        case BaseCommand::PRODUCER_SUCCESS:
            return "PRODUCER_SUCCESS";
            break;
        case BaseCommand::PING:
            return "PING";
            break;
        case BaseCommand::PONG:
            return "PONG";
            break;
        case BaseCommand::PARTITIONED_METADATA:
            return "PARTITIONED_METADATA";
            break;
        case BaseCommand::PARTITIONED_METADATA_RESPONSE:
            return "PARTITIONED_METADATA_RESPONSE";
            break;
        case BaseCommand::REDELIVER_UNACKNOWLEDGED_MESSAGES:
            return "REDELIVER_UNACKNOWLEDGED_MESSAGES";
            break;
        case BaseCommand::LOOKUP:
            return "LOOKUP";
            break;
        case BaseCommand::LOOKUP_RESPONSE:
            return "LOOKUP_RESPONSE";
            break;
        case BaseCommand::CONSUMER_STATS:
            return "CONSUMER_STATS";
            break;
        case BaseCommand::CONSUMER_STATS_RESPONSE:
            return "CONSUMER_STATS_RESPONSE";
            break;
        case BaseCommand::REACHED_END_OF_TOPIC:
            return "REACHED_END_OF_TOPIC";
            break;
        case BaseCommand::SEEK:
            return "SEEK";
            break;
        case BaseCommand::ACTIVE_CONSUMER_CHANGE:
            return "ACTIVE_CONSUMER_CHANGE";
            break;
        case BaseCommand::GET_LAST_MESSAGE_ID:
            return "GET_LAST_MESSAGE_ID";
            break;
        case BaseCommand::GET_LAST_MESSAGE_ID_RESPONSE:
            return "GET_LAST_MESSAGE_ID_RESPONSE";
            break;
        case BaseCommand::GET_TOPICS_OF_NAMESPACE:
            return "GET_TOPICS_OF_NAMESPACE";
            break;
        case BaseCommand::GET_TOPICS_OF_NAMESPACE_RESPONSE:
            return "GET_TOPICS_OF_NAMESPACE_RESPONSE";
            break;
        case BaseCommand::GET_SCHEMA:
            return "GET_SCHEMA";
            break;
        case BaseCommand::GET_SCHEMA_RESPONSE:
            return "GET_SCHEMA_RESPONSE";
            break;
        case BaseCommand::AUTH_CHALLENGE:
            return "AUTH_CHALLENGE";
            break;
        case BaseCommand::AUTH_RESPONSE:
            return "AUTH_RESPONSE";
            break;
        case BaseCommand::ACK_RESPONSE:
            return "ACK_RESPONSE";
            break;
        case BaseCommand::GET_OR_CREATE_SCHEMA:
            return "GET_OR_CREATE_SCHEMA";
        case BaseCommand::GET_OR_CREATE_SCHEMA_RESPONSE:
            return "GET_OR_CREATE_SCHEMA_RESPONSE";
        case BaseCommand::NEW_TXN:
            return "NEW_TXN";
            break;
        case BaseCommand::NEW_TXN_RESPONSE:
            return "NEW_TXN_RESPONSE";
            break;
        case BaseCommand::ADD_PARTITION_TO_TXN:
            return "ADD_PARTITION_TO_TXN";
            break;
        case BaseCommand::ADD_PARTITION_TO_TXN_RESPONSE:
            return "ADD_PARTITION_TO_TXN_RESPONSE";
            break;
        case BaseCommand::ADD_SUBSCRIPTION_TO_TXN:
            return "ADD_SUBSCRIPTION_TO_TXN";
            break;
        case BaseCommand::ADD_SUBSCRIPTION_TO_TXN_RESPONSE:
            return "ADD_SUBSCRIPTION_TO_TXN_RESPONSE";
            break;
        case BaseCommand::END_TXN:
            return "END_TXN";
            break;
        case BaseCommand::END_TXN_RESPONSE:
            return "END_TXN_RESPONSE";
            break;
        case BaseCommand::END_TXN_ON_PARTITION:
            return "END_TXN_ON_PARTITION";
            break;
        case BaseCommand::END_TXN_ON_PARTITION_RESPONSE:
            return "END_TXN_ON_PARTITION_RESPONSE";
            break;
        case BaseCommand::END_TXN_ON_SUBSCRIPTION:
            return "END_TXN_ON_SUBSCRIPTION";
            break;
        case BaseCommand::END_TXN_ON_SUBSCRIPTION_RESPONSE:
            return "END_TXN_ON_SUBSCRIPTION_RESPONSE";
            break;
        case BaseCommand::TC_CLIENT_CONNECT_REQUEST:
            return "TC_CLIENT_CONNECT_REQUEST";
        case BaseCommand::TC_CLIENT_CONNECT_RESPONSE:
            return "TC_CLIENT_CONNECT_RESPONSE";
            break;
    };
    BOOST_THROW_EXCEPTION(std::logic_error("Invalid BaseCommand enumeration value"));
}

void Commands::initBatchMessageMetadata(const Message& msg, pulsar::proto::MessageMetadata& batchMetadata) {
    // metadata has already been set in ProducerImpl::setMessageMetadata
    const proto::MessageMetadata& metadata = msg.impl_->metadata;

    // required fields
    batchMetadata.set_producer_name(metadata.producer_name());
    batchMetadata.set_sequence_id(metadata.sequence_id());
    batchMetadata.set_publish_time(metadata.publish_time());

    // optional fields
    if (metadata.has_partition_key()) {
        batchMetadata.set_partition_key(metadata.partition_key());
    }
    if (metadata.has_ordering_key()) {
        batchMetadata.set_ordering_key(metadata.ordering_key());
    }
    if (metadata.has_replicated_from()) {
        batchMetadata.set_replicated_from(metadata.replicated_from());
    }
    if (metadata.replicate_to_size() > 0) {
        for (int i = 0; i < metadata.replicate_to_size(); i++) {
            batchMetadata.add_replicate_to(metadata.replicate_to(i));
        }
    }
    if (metadata.has_schema_version()) {
        batchMetadata.set_schema_version(metadata.schema_version());
    }
}

uint64_t Commands::serializeSingleMessageInBatchWithPayload(const Message& msg, SharedBuffer& batchPayLoad,
                                                            unsigned long maxMessageSizeInBytes) {
    const auto& msgMetadata = msg.impl_->metadata;
    SingleMessageMetadata metadata;
    if (msgMetadata.has_partition_key()) {
        metadata.set_partition_key(msgMetadata.partition_key());
    }
    if (msgMetadata.has_ordering_key()) {
        metadata.set_ordering_key(msgMetadata.ordering_key());
    }

    metadata.mutable_properties()->Reserve(msgMetadata.properties_size());
    for (int i = 0; i < msgMetadata.properties_size(); i++) {
        auto keyValue = proto::KeyValue().New();
        *keyValue = msgMetadata.properties(i);
        metadata.mutable_properties()->AddAllocated(keyValue);
    }

    if (msgMetadata.has_event_time()) {
        metadata.set_event_time(msgMetadata.event_time());
    }

    if (msgMetadata.has_sequence_id()) {
        metadata.set_sequence_id(msgMetadata.sequence_id());
    }

    // Format of batch message
    // Each Message = [METADATA_SIZE][METADATA] [PAYLOAD]

    int payloadSize = msg.impl_->payload.readableBytes();
    metadata.set_payload_size(payloadSize);

    int msgMetadataSize = metadata.ByteSize();

    unsigned long requiredSpace = sizeof(uint32_t) + msgMetadataSize + payloadSize;
    if (batchPayLoad.writableBytes() <= sizeof(uint32_t) + msgMetadataSize + payloadSize) {
        LOG_DEBUG("remaining size of batchPayLoad buffer ["
                  << batchPayLoad.writableBytes() << "] can't accomodate new payload [" << requiredSpace
                  << "] - expanding the batchPayload buffer");
        uint32_t new_size =
            std::min(batchPayLoad.readableBytes() * 2, static_cast<uint32_t>(maxMessageSizeInBytes));
        new_size = std::max(new_size, batchPayLoad.readableBytes() + static_cast<uint32_t>(requiredSpace));
        SharedBuffer buffer = SharedBuffer::allocate(new_size);
        // Adding batch created so far
        buffer.write(batchPayLoad.data(), batchPayLoad.readableBytes());
        batchPayLoad = buffer;
    }
    // Adding the new message
    batchPayLoad.writeUnsignedInt(msgMetadataSize);
    metadata.SerializeToArray(batchPayLoad.mutableData(), msgMetadataSize);
    batchPayLoad.bytesWritten(msgMetadataSize);
    batchPayLoad.write(msg.impl_->payload.data(), payloadSize);

    return msgMetadata.sequence_id();
}

Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex) {
    SharedBuffer& uncompressedPayload = batchedMessage.impl_->payload;

    // Format of batch message
    // Each Message = [METADATA_SIZE][METADATA] [PAYLOAD]

    const int& singleMetaSize = uncompressedPayload.readUnsignedInt();
    SingleMessageMetadata metadata;
    metadata.ParseFromArray(uncompressedPayload.data(), singleMetaSize);
    uncompressedPayload.consume(singleMetaSize);

    const int& payloadSize = metadata.payload_size();

    // Get a slice of size payloadSize from offset readIndex_
    SharedBuffer payload = uncompressedPayload.slice(0, payloadSize);
    uncompressedPayload.consume(payloadSize);

    const MessageId& m = batchedMessage.impl_->messageId;
    MessageId singleMessageId(m.partition(), m.ledgerId(), m.entryId(), batchIndex);
    Message singleMessage(singleMessageId, batchedMessage.impl_->metadata, payload, metadata,
                          batchedMessage.impl_->getTopicName());
    singleMessage.impl_->cnx_ = batchedMessage.impl_->cnx_;

    return singleMessage;
}

bool Commands::peerSupportsGetLastMessageId(int32_t peerVersion) { return peerVersion >= proto::v12; }

bool Commands::peerSupportsActiveConsumerListener(int32_t peerVersion) { return peerVersion >= proto::v12; }

bool Commands::peerSupportsMultiMessageAcknowledgement(int32_t peerVersion) {
    return peerVersion >= proto::v12;
}

bool Commands::peerSupportsJsonSchemaAvroFormat(int32_t peerVersion) { return peerVersion >= proto::v13; }

bool Commands::peerSupportsGetOrCreateSchema(int32_t peerVersion) { return peerVersion >= proto::v15; }
}  // namespace pulsar
/* namespace pulsar */
