/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
#include <atomic>
#include <ctime>
#include <functional>
#include <gtest/gtest.h>
#include <sstream>
#include <thread>
#include <unistd.h>

#include <lib/Commands.h>
#include <lib/Future.h>
#include <lib/Latch.h>
#include <lib/LogUtils.h>
#include <lib/TopicName.h>
#include <lib/Utils.h>
#include <pulsar/Client.h>
#include <pulsar/MessageBatch.h>
#include <pulsar/MessageBuilder.h>

#include "ConsumerTest.h"
#include "CustomRoutingPolicy.h"
#include "HttpHelper.h"
#include "PulsarFriend.h"

DECLARE_LOG_OBJECT();

using namespace pulsar;

static int globalCount = 0;
static std::string lookupUrl = "pulsar://localhost:6650";
static std::string adminUrl = "http://localhost:8080/";

// ecpoch time in seconds
long epochTime = time(NULL);

static void messageListenerFunction(Consumer consumer, const Message& msg) {
    globalCount++;
    consumer.acknowledge(msg);
}

class MessageCountSendCallback {
   public:
    MessageCountSendCallback(std::atomic_int& numOfMessagesProduced)
        : numOfMessagesProduced_(numOfMessagesProduced) {}

    void operator()(Result result, const MessageId&) {
        ASSERT_EQ(result, ResultOk);
        numOfMessagesProduced_++;
    }

   private:
    std::atomic_int& numOfMessagesProduced_;
};

static void sendFailCallBack(Result r, Result expect_result) { EXPECT_EQ(r, expect_result); }

static int globalPublishCountSuccess = 0;
static int globalPublishCountQueueFull = 0;

static void sendCallBackExpectingErrors(Result r, const MessageId& msgId) {
    if (r == ResultProducerQueueIsFull) {
        globalPublishCountQueueFull++;
    } else if (r == ResultOk) {
        globalPublishCountSuccess++;
    }
}

TEST(BatchMessageTest, testProducerConfig) {
    ProducerConfiguration conf;
    try {
        conf.setBatchingMaxMessages(1);
        FAIL();
    } catch (const std::exception&) {
        // Ok
    }
    ASSERT_EQ(ProducerConfiguration::DefaultBatching, conf.getBatchingType());
    conf.setBatchingType(ProducerConfiguration::KeyBasedBatching);
    ASSERT_EQ(ProducerConfiguration::KeyBasedBatching, conf.getBatchingType());
}

TEST(BatchMessageTest, testProducerTimeout) {
    std::string testName = std::to_string(epochTime) + "testProducerTimeout";

    ClientConfiguration clientConf;
    clientConf.setStatsIntervalInSeconds(1);

    Client client(lookupUrl, clientConf);
    std::string topicName = "persistent://public/default/" + testName;
    std::string subName = "subscription-name";
    Producer producer;

    // Enable batching on producer side
    int batchSize = 3;
    int numOfMessages = 4;
    int timeout = 4000;
    ProducerConfiguration conf;
    conf.setCompressionType(CompressionLZ4);
    conf.setBatchingMaxMessages(batchSize);
    conf.setBatchingMaxPublishDelayMs(timeout);
    conf.setBatchingEnabled(true);

    Promise<Result, Producer> producerPromise;
    client.createProducerAsync(topicName, conf, WaitForCallbackValue<Producer>(producerPromise));
    Future<Result, Producer> producerFuture = producerPromise.getFuture();
    Result result = producerFuture.get(producer);
    ASSERT_EQ(ResultOk, result);

    Consumer consumer;
    Promise<Result, Consumer> consumerPromise;
    client.subscribeAsync(topicName, subName, WaitForCallbackValue<Consumer>(consumerPromise));
    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
    result = consumerFuture.get(consumer);
    ASSERT_EQ(ResultOk, result);

    // handling dangling subscriptions
    consumer.close();
    client.subscribe(topicName, subName, consumer);

    std::string temp = producer.getTopic();
    ASSERT_EQ(temp, topicName);
    temp = consumer.getTopic();
    ASSERT_EQ(temp, topicName);
    ASSERT_EQ(consumer.getSubscriptionName(), subName);

    ProducerStatsImplPtr producerStatsImplPtr = PulsarFriend::getProducerStatsPtr(producer);
    // Send Asynchronously
    std::string prefix = "msg-batch-test-produce-timeout-";
    for (int i = 0; i < numOfMessages; i++) {
        std::string messageContent = prefix + std::to_string(i);
        Message msg = MessageBuilder()
                          .setContent(messageContent)
                          .setProperty("type", "batch")
                          .setProperty("msgIndex", std::to_string(i))
                          .build();
        LOG_DEBUG("sending message " << messageContent);
        clock_t start, end;
        /* Start the timer */
        start = time(NULL);
        LOG_DEBUG("start = " << start);
        Promise<Result, MessageId> promise;
        producer.sendAsync(msg, WaitForCallbackValue<MessageId>(promise));
        MessageId mi;
        promise.getFuture().get(mi);
        /* End the timer */
        end = time(NULL);
        LOG_DEBUG("end = " << end);
        // Greater than or equal to since there may be delay in sending messaging
        ASSERT_GE((double)(end - start), timeout / 1000.0);
        ASSERT_EQ(producerStatsImplPtr->getTotalMsgsSent(), i + 1);
        ASSERT_EQ(PulsarFriend::sum(producerStatsImplPtr->getTotalSendMap()), i + 1);
    }

    Message receivedMsg;
    int i = 0;
    while (consumer.receive(receivedMsg, 5000) == ResultOk) {
        std::string expectedMessageContent = prefix + std::to_string(i);
        LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = "
                                                       << receivedMsg.getMessageId() << "]");
        ASSERT_EQ(receivedMsg.getProperty("msgIndex"), std::to_string(i++));
        ASSERT_EQ(receivedMsg.getProperty("type"), "batch");
        ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
        ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
    }
    // Number of messages consumed
    ASSERT_EQ(i, numOfMessages);
}

TEST(BatchMessageTest, testBatchSizeInBytes) {
    std::string testName = std::to_string(epochTime) + "testBatchSizeInBytes";

    Client client(lookupUrl);
    std::string topicName = "persistent://public/default/" + testName;
    std::string subName = "subscription-name";
    Producer producer;

    // Enable batching on producer side
    int batchSize = 1000;
    int numOfMessages = 30;
    ProducerConfiguration conf;
    conf.setCompressionType(CompressionLZ4);
    conf.setBatchingMaxMessages(batchSize);
    conf.setBatchingMaxAllowedSizeInBytes(20);
    conf.setBatchingEnabled(true);

    Promise<Result, Producer> producerPromise;
    client.createProducerAsync(topicName, conf, WaitForCallbackValue<Producer>(producerPromise));
    Future<Result, Producer> producerFuture = producerPromise.getFuture();
    Result result = producerFuture.get(producer);
    ASSERT_EQ(ResultOk, result);

    Consumer consumer;
    Promise<Result, Consumer> consumerPromise;
    client.subscribeAsync(topicName, subName, WaitForCallbackValue<Consumer>(consumerPromise));
    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
    result = consumerFuture.get(consumer);
    ASSERT_EQ(ResultOk, result);

    // handling dangling subscriptions
    consumer.close();
    client.subscribe(topicName, subName, consumer);

    std::string temp = producer.getTopic();
    ASSERT_EQ(temp, topicName);
    temp = consumer.getTopic();
    ASSERT_EQ(temp, topicName);
    ASSERT_EQ(consumer.getSubscriptionName(), subName);

    ProducerStatsImplPtr producerStatsImplPtr = PulsarFriend::getProducerStatsPtr(producer);
    // Send Asynchronously
    std::atomic_int numOfMessagesProduced{0};
    std::string prefix = "12345678";
    for (int i = 0; i < numOfMessages; i++) {
        std::string messageContent = prefix + std::to_string(i);
        Message msg =
            MessageBuilder().setContent(messageContent).setProperty("msgIndex", std::to_string(i)).build();
        producer.sendAsync(msg, MessageCountSendCallback(numOfMessagesProduced));
        ASSERT_EQ(producerStatsImplPtr->getNumMsgsSent(), i + 1);
        ASSERT_LT(PulsarFriend::sum(producerStatsImplPtr->getSendMap()), i + 1);
        ASSERT_EQ(producerStatsImplPtr->getTotalMsgsSent(), i + 1);
        ASSERT_LT(PulsarFriend::sum(producerStatsImplPtr->getTotalSendMap()), i + 1);
        LOG_DEBUG("sending message " << messageContent);
    }

    Message receivedMsg;
    int i = 0;
    while (consumer.receive(receivedMsg, 5000) == ResultOk) {
        std::string expectedMessageContent = prefix + std::to_string(i);
        LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = "
                                                       << receivedMsg.getMessageId() << "]");
        ASSERT_LT(pulsar::PulsarFriend::getBatchIndex(receivedMsg.getMessageId()), 2);
        ASSERT_EQ(receivedMsg.getProperty("msgIndex"), std::to_string(i++));
        ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
        ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
    }

    // Check stats
    ASSERT_EQ(PulsarFriend::sum(producerStatsImplPtr->getSendMap()), numOfMessages);
    ASSERT_EQ(PulsarFriend::sum(producerStatsImplPtr->getTotalSendMap()), numOfMessages);

    // Number of messages produced
    ASSERT_EQ(numOfMessagesProduced.load(), numOfMessages);

    // Number of messages consumed
    ASSERT_EQ(i, numOfMessages);
}

TEST(BatchMessageTest, testSmallReceiverQueueSize) {
    std::string testName = std::to_string(epochTime) + "testSmallReceiverQueueSize";

    ClientConfiguration clientConf;
    clientConf.setStatsIntervalInSeconds(20);

    Client client(lookupUrl, clientConf);
    std::string topicName = "persistent://public/default/" + testName;
    std::string subName = "subscription-name";
    Producer producer;

    // Enable batching on producer side
    int batchSize = 10;
    int numOfMessages = 1000;
    ProducerConfiguration conf;
    conf.setCompressionType(CompressionLZ4);
    conf.setBatchingMaxMessages(batchSize);
    conf.setBatchingMaxPublishDelayMs(1);
    conf.setBatchingEnabled(true);
    conf.setMaxPendingMessages(numOfMessages + 1);

    Promise<Result, Producer> producerPromise;
    client.createProducerAsync(topicName, conf, WaitForCallbackValue<Producer>(producerPromise));
    Future<Result, Producer> producerFuture = producerPromise.getFuture();
    Result result = producerFuture.get(producer);
    ASSERT_EQ(ResultOk, result);

    Consumer consumer;
    ConsumerConfiguration consumerConfig;
    consumerConfig.setReceiverQueueSize(41);

    Promise<Result, Consumer> consumerPromise;
    client.subscribeAsync(topicName, subName, consumerConfig,
                          WaitForCallbackValue<Consumer>(consumerPromise));
    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
    result = consumerFuture.get(consumer);
    ASSERT_EQ(ResultOk, result);

    // handling dangling subscriptions
    consumer.close();
    client.subscribe(topicName, subName, consumerConfig, consumer);

    std::string temp = producer.getTopic();
    ASSERT_EQ(temp, topicName);
    temp = consumer.getTopic();
    ASSERT_EQ(temp, topicName);
    ASSERT_EQ(consumer.getSubscriptionName(), subName);

    ProducerStatsImplPtr producerStatsImplPtr = PulsarFriend::getProducerStatsPtr(producer);
    // Send Asynchronously
    std::atomic_int numOfMessagesProduced{0};
    std::string prefix = testName;
    for (int i = 0; i < numOfMessages; i++) {
        std::string messageContent = prefix + std::to_string(i);
        Message msg =
            MessageBuilder().setContent(messageContent).setProperty("msgIndex", std::to_string(i)).build();
        producer.sendAsync(msg, MessageCountSendCallback(numOfMessagesProduced));
        ASSERT_EQ(producerStatsImplPtr->getTotalMsgsSent(), i + 1);
        ASSERT_LE(PulsarFriend::sum(producerStatsImplPtr->getTotalSendMap()), i + 1);
        LOG_DEBUG("sending message " << messageContent);
    }

    Message receivedMsg;
    int i = 0;
    for (i = 0; i < numOfMessages; i++) {
        consumer.receive(receivedMsg);
        std::string expectedMessageContent = prefix + std::to_string(i);
        LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = "
                                                       << receivedMsg.getMessageId() << "]");
        ASSERT_EQ(receivedMsg.getProperty("msgIndex"), std::to_string(i));
        ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
        ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
    }

    ConsumerStatsImplPtr consumerStatsImplPtr = PulsarFriend::getConsumerStatsPtr(consumer);
    unsigned long t = consumerStatsImplPtr->getAckedMsgMap().at(
        std::make_pair<Result, proto::CommandAck_AckType>(ResultOk, proto::CommandAck_AckType_Individual));
    ASSERT_EQ(t, numOfMessages);
    ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getAckedMsgMap()), numOfMessages);
    ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getTotalAckedMsgMap()), numOfMessages);
    ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getReceivedMsgMap()), numOfMessages);
    ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getTotalReceivedMsgMap()), numOfMessages);
    ASSERT_EQ(consumerStatsImplPtr->getTotalNumBytesRecieved(), consumerStatsImplPtr->getNumBytesRecieved());
    std::this_thread::sleep_for(std::chrono::seconds(20));
    ASSERT_NE(consumerStatsImplPtr->getTotalNumBytesRecieved(), consumerStatsImplPtr->getNumBytesRecieved());
    ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getTotalAckedMsgMap()), numOfMessages);
    ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getTotalReceivedMsgMap()), numOfMessages);
    ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getAckedMsgMap()), 0);
    ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getReceivedMsgMap()), 0);

    // Number of messages produced
    ASSERT_EQ(numOfMessagesProduced.load(), numOfMessages);

    // Number of messages consumed
    ASSERT_EQ(i, numOfMessages);
}

TEST(BatchMessageTest, testIndividualAck) {
    std::string testName = std::to_string(epochTime) + "testIndividualAck";

    ClientConfiguration clientConfig;
    clientConfig.setStatsIntervalInSeconds(1);

    Client client(lookupUrl, clientConfig);
    std::string topicName = "persistent://public/default/" + testName;
    std::string subName = "subscription-name";
    Producer producer;

    // Enable batching on producer side
    int batchSize = 5;
    int numOfMessages = 10;
    ProducerConfiguration conf;
    conf.setBatchingMaxMessages(batchSize);
    conf.setBatchingEnabled(true);

    Promise<Result, Producer> producerPromise;
    client.createProducerAsync(topicName, conf, WaitForCallbackValue<Producer>(producerPromise));
    Future<Result, Producer> producerFuture = producerPromise.getFuture();
    Result result = producerFuture.get(producer);
    ASSERT_EQ(ResultOk, result);

    Consumer consumer;
    ConsumerConfiguration consumerConfig;
    consumerConfig.setReceiverQueueSize(1);

    Promise<Result, Consumer> consumerPromise;
    client.subscribeAsync(topicName, subName, consumerConfig,
                          WaitForCallbackValue<Consumer>(consumerPromise));
    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
    result = consumerFuture.get(consumer);
    ASSERT_EQ(ResultOk, result);

    // handling dangling subscriptions
    consumer.close();
    client.subscribe(topicName, subName, consumerConfig, consumer);

    std::string temp = producer.getTopic();
    ASSERT_EQ(temp, topicName);
    temp = consumer.getTopic();
    ASSERT_EQ(temp, topicName);
    ASSERT_EQ(consumer.getSubscriptionName(), subName);

    // Send Asynchronously
    std::atomic_int numOfMessagesProduced{0};
    std::string prefix = testName;
    for (int i = 0; i < numOfMessages; i++) {
        std::string messageContent = prefix + std::to_string(i);
        Message msg =
            MessageBuilder().setContent(messageContent).setProperty("msgIndex", std::to_string(i)).build();
        producer.sendAsync(msg, MessageCountSendCallback(numOfMessagesProduced));
        LOG_DEBUG("sending message " << messageContent);
    }
    Message receivedMsg;
    int i = 0;
    while (consumer.receive(receivedMsg, 5000) == ResultOk) {
        std::string expectedMessageContent = prefix + std::to_string(i);
        LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = "
                                                       << receivedMsg.getMessageId() << "]");
        ASSERT_EQ(receivedMsg.getProperty("msgIndex"), std::to_string(i++));
        ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
        // Ack every 2nd message
        if (i % 2 == 0) {
            ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
        }
    }
    // Number of messages produced
    ASSERT_EQ(numOfMessagesProduced.load(), numOfMessages);

    // Number of messages consumed
    ASSERT_EQ(i, numOfMessages);

    // Unsubscribe and resubscribe
    // Expecting all messages to be sent again

    consumer.close();
    client.subscribe(topicName, subName, consumerConfig, consumer);

    i = 0;
    while (consumer.receive(receivedMsg, 5000) == ResultOk) {
        std::string expectedMessageContent = prefix + std::to_string(i);
        LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = "
                                                       << receivedMsg.getMessageId() << "]");
        ASSERT_EQ(receivedMsg.getProperty("msgIndex"), std::to_string(i++));
        ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
        // Ack every first 5 and 10th message
        if (i <= 5 || i == 10) {
            ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
        }
    }

    // Number of messages consumed
    ASSERT_EQ(i, numOfMessages);

    // Unsubscribe and resubscribe
    // Expecting only one batch message to be resent

    consumer.close();
    client.subscribe(topicName, subName, consumerConfig, consumer);

    i = 0;
    while (consumer.receive(receivedMsg, 5000) == ResultOk) {
        std::string expectedMessageContent = prefix + std::to_string(i + numOfMessages / 2);
        LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = "
                                                       << receivedMsg.getMessageId() << "]");
        ASSERT_EQ(receivedMsg.getProperty("msgIndex"), std::to_string(i++ + numOfMessages / 2));
        ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
        // Ack first 4 message only
        if (i <= 4) {
            ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
        }
    }

    // Number of messages consumed
    ASSERT_EQ(i, numOfMessages / 2);

    // Unsubscribe and resubscribe
    // Expecting only one batch message to be resent

    consumer.close();
    client.subscribe(topicName, subName, consumerConfig, consumer);

    i = 0;
    while (consumer.receive(receivedMsg, 5000) == ResultOk) {
        std::string expectedMessageContent = prefix + std::to_string(i + numOfMessages / 2);
        LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = "
                                                       << receivedMsg.getMessageId() << "]");
        ASSERT_EQ(receivedMsg.getProperty("msgIndex"), std::to_string(i++ + numOfMessages / 2));
        ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
        // Ack all
        ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
    }

    // Number of messages consumed
    ASSERT_EQ(i, numOfMessages / 2);

    // Unsubscribe and resubscribe
    // Expecting no batch message to be resent

    consumer.close();
    client.subscribe(topicName, subName, consumerConfig, consumer);

    // Number of messages consumed
    ASSERT_NE(ResultOk, consumer.receive(receivedMsg, 5000));

    consumer.close();
    client.close();
}

TEST(BatchMessageTest, testCumulativeAck) {
    std::string testName = std::to_string(epochTime) + "testCumulativeAck";

    ClientConfiguration clientConfig;
    clientConfig.setStatsIntervalInSeconds(100);

    Client client(lookupUrl, clientConfig);
    std::string topicName = "persistent://public/default/" + testName;
    std::string subName = "subscription-name";
    Producer producer;

    // Enable batching on producer side
    int batchSize = 5;
    int numOfMessages = 15;
    ProducerConfiguration conf;
    conf.setBatchingMaxMessages(batchSize);
    conf.setBatchingEnabled(true);

    Promise<Result, Producer> producerPromise;
    client.createProducerAsync(topicName, conf, WaitForCallbackValue<Producer>(producerPromise));
    Future<Result, Producer> producerFuture = producerPromise.getFuture();
    Result result = producerFuture.get(producer);
    ASSERT_EQ(ResultOk, result);

    Consumer consumer;
    ConsumerConfiguration consumerConfig;
    consumerConfig.setReceiverQueueSize(1);

    Promise<Result, Consumer> consumerPromise;
    client.subscribeAsync(topicName, subName, consumerConfig,
                          WaitForCallbackValue<Consumer>(consumerPromise));
    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
    result = consumerFuture.get(consumer);
    ASSERT_EQ(ResultOk, result);

    // handling dangling subscriptions
    consumer.close();
    client.subscribe(topicName, subName, consumerConfig, consumer);

    std::string temp = producer.getTopic();
    ASSERT_EQ(temp, topicName);
    temp = consumer.getTopic();
    ASSERT_EQ(temp, topicName);
    ASSERT_EQ(consumer.getSubscriptionName(), subName);
    ProducerStatsImplPtr producerStatsImplPtr = PulsarFriend::getProducerStatsPtr(producer);

    // Send Asynchronously
    std::atomic_int numOfMessagesProduced{0};
    std::string prefix = testName;
    for (int i = 0; i < numOfMessages; i++) {
        std::string messageContent = prefix + std::to_string(i);
        Message msg =
            MessageBuilder().setContent(messageContent).setProperty("msgIndex", std::to_string(i)).build();
        producer.sendAsync(msg, MessageCountSendCallback(numOfMessagesProduced));
        LOG_DEBUG("sending message " << messageContent);
    }

    Message receivedMsg;
    int i = 0;
    ConsumerStatsImplPtr consumerStatsImplPtr = PulsarFriend::getConsumerStatsPtr(consumer);
    while (consumer.receive(receivedMsg, 5000) == ResultOk) {
        std::string expectedMessageContent = prefix + std::to_string(i);
        LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = "
                                                       << receivedMsg.getMessageId() << "]");
        ASSERT_EQ(receivedMsg.getProperty("msgIndex"), std::to_string(i++));
        ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
        // Cumm. Ack 7th message
        if (i == 7) {
            ASSERT_EQ(ResultOk, consumer.acknowledgeCumulative(receivedMsg));
        }
    }

    ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getReceivedMsgMap()), i + 1);
    // Since last receive call times out
    ASSERT_EQ(consumerStatsImplPtr->getReceivedMsgMap().at(ResultOk), i);
    ASSERT_EQ(consumerStatsImplPtr->getReceivedMsgMap().at(ResultTimeout), 1);
    ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getAckedMsgMap()), 1);
    ASSERT_EQ(producerStatsImplPtr->getNumBytesSent(), consumerStatsImplPtr->getNumBytesRecieved());
    unsigned long t = consumerStatsImplPtr->getAckedMsgMap().at(
        std::make_pair<Result, proto::CommandAck_AckType>(ResultOk, proto::CommandAck_AckType_Cumulative));
    ASSERT_EQ(t, 1);

    // Number of messages produced
    ASSERT_EQ(numOfMessagesProduced.load(), numOfMessages);

    // Number of messages consumed
    ASSERT_EQ(i, numOfMessages);

    // Unsubscribe and resubscribe
    // Expecting 10 messages to be sent again

    consumer.close();
    client.subscribe(topicName, subName, consumerConfig, consumer);

    consumerStatsImplPtr = PulsarFriend::getConsumerStatsPtr(consumer);
    i = 0;
    while (consumer.receive(receivedMsg, 5000) == ResultOk) {
        std::string expectedMessageContent = prefix + std::to_string(i + 5);
        LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = "
                                                       << receivedMsg.getMessageId() << "]");
        ASSERT_EQ(receivedMsg.getProperty("msgIndex"), std::to_string(i++ + 5));
        ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
        // Ack 10th message
        if (i == 10) {
            ASSERT_EQ(ResultOk, consumer.acknowledgeCumulative(receivedMsg));
        }
    }

    ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getAckedMsgMap()), 1);
    t = consumerStatsImplPtr->getAckedMsgMap().at(
        std::make_pair<Result, proto::CommandAck_AckType>(ResultOk, proto::CommandAck_AckType_Cumulative));
    ASSERT_EQ(t, 1);

    // Number of messages consumed
    ASSERT_EQ(i, 10);

    // Unsubscribe and resubscribe
    // Expecting no batch message to be resent

    consumer.close();
    client.subscribe(topicName, subName, consumerConfig, consumer);

    // Number of messages consumed
    ASSERT_NE(ResultOk, consumer.receive(receivedMsg, 5000));
}

TEST(BatchMessageTest, testMixedAck) {
    std::string testName = std::to_string(epochTime) + "testMixedAck";

    Client client(lookupUrl);
    std::string topicName = "persistent://public/default/" + testName;
    std::string subName = "subscription-name";
    Producer producer;

    // Enable batching on producer side
    int batchSize = 5;
    int numOfMessages = 15;
    ProducerConfiguration conf;
    conf.setBatchingMaxMessages(batchSize);
    conf.setBatchingEnabled(true);

    Promise<Result, Producer> producerPromise;
    client.createProducerAsync(topicName, conf, WaitForCallbackValue<Producer>(producerPromise));
    Future<Result, Producer> producerFuture = producerPromise.getFuture();
    Result result = producerFuture.get(producer);
    ASSERT_EQ(ResultOk, result);

    Consumer consumer;
    ConsumerConfiguration consumerConfig;
    Promise<Result, Consumer> consumerPromise;
    client.subscribeAsync(topicName, subName, consumerConfig,
                          WaitForCallbackValue<Consumer>(consumerPromise));
    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
    result = consumerFuture.get(consumer);
    ASSERT_EQ(ResultOk, result);

    // handling dangling subscriptions
    consumer.close();
    client.subscribe(topicName, subName, consumerConfig, consumer);

    std::string temp = producer.getTopic();
    ASSERT_EQ(temp, topicName);
    temp = consumer.getTopic();
    ASSERT_EQ(temp, topicName);
    ASSERT_EQ(consumer.getSubscriptionName(), subName);

    // Send Asynchronously
    std::atomic_int numOfMessagesProduced{0};
    std::string prefix = testName;
    for (int i = 0; i < numOfMessages; i++) {
        std::string messageContent = prefix + std::to_string(i);
        Message msg =
            MessageBuilder().setContent(messageContent).setProperty("msgIndex", std::to_string(i)).build();
        producer.sendAsync(msg, MessageCountSendCallback(numOfMessagesProduced));
        LOG_DEBUG("sending message " << messageContent);
    }

    Message receivedMsg;
    int i = 0;
    while (consumer.receive(receivedMsg, 5000) == ResultOk) {
        std::string expectedMessageContent = prefix + std::to_string(i);
        LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = "
                                                       << receivedMsg.getMessageId() << "]");
        ASSERT_EQ(receivedMsg.getProperty("msgIndex"), std::to_string(i++));
        ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
        // Cumm. Ack 14th message
        if (i == 14) {
            ASSERT_EQ(ResultOk, consumer.acknowledgeCumulative(receivedMsg));
        }
    }
    // Number of messages produced
    ASSERT_EQ(numOfMessagesProduced.load(), numOfMessages);

    // Number of messages consumed
    ASSERT_EQ(i, numOfMessages);

    // Unsubscribe and resubscribe
    // Expecting 5 messages to be sent again

    consumer.close();
    client.subscribe(topicName, subName, consumerConfig, consumer);

    i = 0;
    while (consumer.receive(receivedMsg, 5000) == ResultOk) {
        std::string expectedMessageContent = prefix + std::to_string(i + 10);
        LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = "
                                                       << receivedMsg.getMessageId() << "]");
        ASSERT_EQ(receivedMsg.getProperty("msgIndex"), std::to_string(i++ + 10));
        ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
        // Cumm Ack 9th message
        if (i == 4) {
            ASSERT_EQ(ResultOk, consumer.acknowledgeCumulative(receivedMsg));
        }
    }
    ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));

    // Number of messages consumed
    ASSERT_EQ(i, 5);

    // Unsubscribe and resubscribe
    // Expecting no batch message to be resent

    consumer.close();
    client.subscribe(topicName, subName, consumerConfig, consumer);

    // Number of messages consumed
    ASSERT_NE(ResultOk, consumer.receive(receivedMsg, 5000));
}

// Also testing Cumulative Ack test case where greatestCumulativeAck returns
// MessageId()
TEST(BatchMessageTest, testPermits) {
    std::string testName = std::to_string(epochTime) + "testPermits";

    Client client(lookupUrl);
    std::string topicName = "persistent://public/default/" + testName;
    std::string subName = "subscription-name";
    Producer producer;

    // Enable batching on producer side
    int batchSize = 10;
    int numOfMessages = 75;
    ProducerConfiguration conf;
    conf.setBatchingMaxMessages(batchSize);
    conf.setBatchingMaxPublishDelayMs(5);
    conf.setBatchingEnabled(true);

    Promise<Result, Producer> producerPromise;
    client.createProducerAsync(topicName, conf, WaitForCallbackValue<Producer>(producerPromise));
    Future<Result, Producer> producerFuture = producerPromise.getFuture();
    Result result = producerFuture.get(producer);
    ASSERT_EQ(ResultOk, result);

    Consumer consumer;
    ConsumerConfiguration consumerConfig;
    consumerConfig.setReceiverQueueSize(5);

    Promise<Result, Consumer> consumerPromise;
    client.subscribeAsync(topicName, subName, consumerConfig,
                          WaitForCallbackValue<Consumer>(consumerPromise));
    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
    result = consumerFuture.get(consumer);
    ASSERT_EQ(ResultOk, result);

    // handling dangling subscriptions
    consumer.close();
    client.subscribe(topicName, subName, consumerConfig, consumer);

    std::string temp = producer.getTopic();
    ASSERT_EQ(temp, topicName);
    temp = consumer.getTopic();
    ASSERT_EQ(temp, topicName);
    ASSERT_EQ(consumer.getSubscriptionName(), subName);

    // Send Asynchronously
    std::atomic_int numOfMessagesProduced{0};
    std::string prefix = testName;
    for (int i = 0; i < numOfMessages; i++) {
        std::string messageContent = prefix + std::to_string(i);
        Message msg =
            MessageBuilder().setContent(messageContent).setProperty("msgIndex", std::to_string(i)).build();
        producer.sendAsync(msg, MessageCountSendCallback(numOfMessagesProduced));
        LOG_DEBUG("sending message " << messageContent);
    }

    std::this_thread::sleep_for(std::chrono::seconds(5));

    Message receivedMsg;
    int i = 0;
    while (consumer.receive(receivedMsg, 5000) == ResultOk) {
        std::string expectedMessageContent = prefix + std::to_string(i);
        LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = "
                                                       << receivedMsg.getMessageId() << "]");
        ASSERT_EQ(receivedMsg.getProperty("msgIndex"), std::to_string(i++));
        ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
        ASSERT_EQ(ResultOk, consumer.acknowledgeCumulative(receivedMsg));
        ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
    }
    // Number of messages produced
    ASSERT_EQ(numOfMessagesProduced.load(), numOfMessages);

    // Number of messages consumed
    ASSERT_EQ(i, numOfMessages);

    // Since all messages are acked
    // Creating 25 new non batched message
    conf.setBatchingEnabled(false);

    client.createProducer(topicName, conf, producer);

    numOfMessagesProduced = 0;
    // Send Asynchronously
    for (int i = 0; i < numOfMessages; i++) {
        std::string messageContent = prefix + std::to_string(i);
        Message msg =
            MessageBuilder().setContent(messageContent).setProperty("msgIndex", std::to_string(i)).build();
        producer.sendAsync(msg, MessageCountSendCallback(numOfMessagesProduced));
        LOG_DEBUG("sending message " << messageContent);
    }
    std::this_thread::sleep_for(std::chrono::seconds(5));

    ASSERT_LE(ConsumerTest::getNumOfMessagesInQueue(consumer), consumerConfig.getReceiverQueueSize());
    ASSERT_GE(ConsumerTest::getNumOfMessagesInQueue(consumer), consumerConfig.getReceiverQueueSize() / 2);

    i = 0;
    while (consumer.receive(receivedMsg, 5000) == ResultOk) {
        std::string expectedMessageContent = prefix + std::to_string(i);
        LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = "
                                                       << receivedMsg.getMessageId() << "]");
        ASSERT_EQ(receivedMsg.getProperty("msgIndex"), std::to_string(i++));
        ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
        ASSERT_EQ(ResultOk, consumer.acknowledgeCumulative(receivedMsg));
    }
    // Number of messages produced
    ASSERT_EQ(numOfMessagesProduced.load(), numOfMessages);

    // Number of messages consumed
    ASSERT_EQ(i, numOfMessages);
}

TEST(BatchMessageTest, testPartitionedTopics) {
    Client client(lookupUrl);
    std::string topicName =
        "persistent://public/default/test-partitioned-batch-messages-" + std::to_string(epochTime);

    // call admin api to make it partitioned
    std::string url = adminUrl + "admin/v2/persistent/public/default/test-partitioned-batch-messages-" +
                      std::to_string(epochTime) + "/partitions";
    int res = makePutRequest(url, "7");

    LOG_DEBUG("res = " << res);
    ASSERT_FALSE(res != 204 && res != 409);

    std::this_thread::sleep_for(std::chrono::seconds(2));

    Producer producer;
    // Enable batching on producer side
    int batchSize = 100;
    int numOfMessages = 10000;
    ProducerConfiguration conf;

    conf.setCompressionType(CompressionZLib);
    conf.setBatchingMaxMessages(batchSize);
    conf.setBatchingEnabled(true);
    conf.setBatchingMaxPublishDelayMs(5);
    conf.setBlockIfQueueFull(false);
    conf.setMaxPendingMessages(10);

    Promise<Result, Producer> producerPromise;
    client.createProducerAsync(topicName, conf, WaitForCallbackValue<Producer>(producerPromise));
    Future<Result, Producer> producerFuture = producerPromise.getFuture();
    Result result = producerFuture.get(producer);
    ASSERT_EQ(ResultOk, result);

    std::string subName = "subscription-name";
    Consumer consumer;
    Promise<Result, Consumer> consumerPromise;
    client.subscribeAsync(topicName, subName, WaitForCallbackValue<Consumer>(consumerPromise));
    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
    result = consumerFuture.get(consumer);
    ASSERT_EQ(ResultOk, result);

    std::string temp = producer.getTopic();
    ASSERT_EQ(temp, topicName);
    temp = consumer.getTopic();
    ASSERT_EQ(temp, topicName);
    ASSERT_EQ(consumer.getSubscriptionName(), subName);

    globalPublishCountSuccess = 0;
    globalPublishCountQueueFull = 0;

    // Send Asynchronously
    std::string prefix = "msg-batch-";
    for (int i = 0; i < numOfMessages; i++) {
        std::string messageContent = prefix + std::to_string(i);
        Message msg =
            MessageBuilder().setContent(messageContent).setProperty("msgIndex", std::to_string(i)).build();
        producer.sendAsync(msg, &sendCallBackExpectingErrors);
        LOG_DEBUG("sending message " << messageContent);
    }

    Message receivedMsg;
    int i = 0;
    while (consumer.receive(receivedMsg, 30000) == ResultOk) {
        LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = "
                                                       << receivedMsg.getMessageId() << "]");
        ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
        i++;
    }

    LOG_DEBUG("globalPublishCountQueueFull = " << globalPublishCountQueueFull);
    LOG_DEBUG("globalPublishCountSuccess = " << globalPublishCountSuccess);
    LOG_DEBUG("numOfMessages = " << numOfMessages);

    // Number of messages produced
    ASSERT_EQ(globalPublishCountSuccess + globalPublishCountQueueFull, numOfMessages);

    // Number of messages consumed
    ASSERT_EQ(i, numOfMessages - globalPublishCountQueueFull);
}

TEST(BatchMessageTest, producerFailureResult) {
    std::string testName = std::to_string(epochTime) + "testCumulativeAck";

    ClientConfiguration clientConfig;
    clientConfig.setStatsIntervalInSeconds(100);

    Client client(lookupUrl, clientConfig);
    std::string topicName = "persistent://public/default/" + testName;
    std::string subName = "subscription-name";
    Producer producer;

    int batchSize = 100;
    int numOfMessages = 10000;
    ProducerConfiguration conf;

    conf.setCompressionType(CompressionZLib);
    conf.setBatchingMaxMessages(batchSize);
    conf.setBatchingEnabled(true);
    conf.setBatchingMaxPublishDelayMs(50000);
    conf.setBlockIfQueueFull(false);
    conf.setMaxPendingMessages(10);

    Result res = Result::ResultBrokerMetadataError;

    client.createProducer(topicName, conf, producer);
    Message msg = MessageBuilder().setContent("test").build();
    producer.sendAsync(msg, std::bind(&sendFailCallBack, std::placeholders::_1, res));
    PulsarFriend::producerFailMessages(producer, res);
}

TEST(BatchMessageTest, testPraseMessageBatchEntry) {
    struct Case {
        std::string content;
        std::string propKey;
        std::string propValue;
    };
    std::vector<Case> cases;
    cases.push_back(Case{"example1", "prop1", "value1"});
    cases.push_back(Case{"example2", "prop2", "value2"});

    SharedBuffer payload = SharedBuffer::allocate(128);
    for (auto it = cases.begin(); it != cases.end(); ++it) {
        MessageBuilder msgBuilder;
        const Message& message =
            msgBuilder.setContent(it->content).setProperty(it->propKey, it->propValue).build();
        Commands::serializeSingleMessageInBatchWithPayload(message, payload, 1024);
    }

    MessageBatch messageBatch;
    MessageId fakeId(0, 5000, 10, -1);
    messageBatch.withMessageId(fakeId).parseFrom(payload, static_cast<uint32_t>(cases.size()));
    const std::vector<Message>& messages = messageBatch.messages();

    ASSERT_EQ(messages.size(), cases.size());
    for (int i = 0; i < cases.size(); ++i) {
        const Message& message = messages[i];
        const Case& expected = cases[i];
        ASSERT_EQ(message.getMessageId().batchIndex(), i);
        ASSERT_EQ(message.getMessageId().ledgerId(), 5000);
        ASSERT_EQ(message.getDataAsString(), expected.content);
        ASSERT_EQ(message.getProperty(expected.propKey), expected.propValue);
    }
}

TEST(BatchMessageTest, testSendCallback) {
    const std::string topicName = "persistent://public/default/BasicMessageTest-testSendCallback";

    Client client(lookupUrl);

    constexpr int numMessagesOfBatch = 3;

    ProducerConfiguration producerConfig;
    producerConfig.setBatchingEnabled(true);
    producerConfig.setBatchingMaxMessages(numMessagesOfBatch);
    producerConfig.setBatchingMaxPublishDelayMs(1000);  // 1 s, it's long enough for 3 messages batched
    producerConfig.setMaxPendingMessages(5);

    Producer producer;
    ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfig, producer));

    Consumer consumer;
    ASSERT_EQ(ResultOk, client.subscribe(topicName, "SubscriptionName", consumer));

    Latch latch(numMessagesOfBatch);
    std::set<MessageId> sentIdSet;
    for (int i = 0; i < numMessagesOfBatch; i++) {
        const auto msg = MessageBuilder().setContent("a").build();
        producer.sendAsync(msg, [&sentIdSet, i, &latch](Result result, const MessageId& id) {
            ASSERT_EQ(ResultOk, result);
            ASSERT_EQ(i, id.batchIndex());
            sentIdSet.emplace(id);
            LOG_INFO("id of batch " << i << ": " << id);
            latch.countdown();
        });
    }

    std::set<MessageId> receivedIdSet;
    for (int i = 0; i < numMessagesOfBatch; i++) {
        Message msg;
        ASSERT_EQ(ResultOk, consumer.receive(msg));
        receivedIdSet.emplace(msg.getMessageId());
        consumer.acknowledge(msg);
    }

    latch.wait();
    ASSERT_EQ(sentIdSet, receivedIdSet);

    consumer.close();
    producer.close();
    client.close();
}

TEST(BatchMessageTest, testProducerQueueWithBatches) {
    std::string testName = std::to_string(epochTime) + "testProducerQueueWithBatches";

    ClientConfiguration clientConf;
    clientConf.setStatsIntervalInSeconds(0);

    Client client(lookupUrl, clientConf);
    std::string topicName = "persistent://public/default/" + testName;

    // Enable batching on producer side
    ProducerConfiguration conf;
    conf.setBlockIfQueueFull(false);
    conf.setMaxPendingMessages(10);
    conf.setBatchingMaxMessages(10000);
    conf.setBatchingMaxPublishDelayMs(1000);
    conf.setBatchingEnabled(true);

    Producer producer;
    Result result = client.createProducer(topicName, conf, producer);
    ASSERT_EQ(ResultOk, result);

    std::string prefix = "msg-batch-test-produce-timeout-";
    int rejectedMessges = 0;
    for (int i = 0; i < 20; i++) {
        std::string messageContent = prefix + std::to_string(i);
        Message msg = MessageBuilder().setContent("hello").build();

        producer.sendAsync(msg, [&rejectedMessges](Result result, const MessageId& id) {
            if (result == ResultProducerQueueIsFull) {
                ++rejectedMessges;
            }
        });
    }

    ASSERT_EQ(rejectedMessges, 10);
}