/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
#include <gtest/gtest.h>
#include <pulsar/Client.h>
#include <pulsar/ConsumerConfiguration.h>
#include <pulsar/DeadLetterPolicyBuilder.h>

#include "HttpHelper.h"
#include "PulsarFriend.h"
#include "lib/ConsumerConfigurationImpl.h"
#include "lib/LogUtils.h"
#include "lib/MessageIdUtil.h"
#include "lib/UnAckedMessageTrackerEnabled.h"
#include "lib/Utils.h"

static const std::string lookupUrl = "pulsar://localhost:6650";
static const std::string adminUrl = "http://localhost:8080/";

DECLARE_LOG_OBJECT()

namespace pulsar {

TEST(DeadLetterQueueTest, testDLQWithSchema) {
    Client client(lookupUrl);
    const std::string topic = "testDLQWithSchema-" + std::to_string(time(nullptr));
    const std::string subName = "test-sub";

    static const std::string jsonSchema =
        R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
    SchemaInfo schemaInfo(JSON, "test-json", jsonSchema);

    auto dlqPolicy = DeadLetterPolicyBuilder()
                         .maxRedeliverCount(3)
                         .deadLetterTopic(topic + subName + "-DLQ")
                         .initialSubscriptionName("init-sub")
                         .build();
    ConsumerConfiguration consumerConfig;
    consumerConfig.setDeadLetterPolicy(dlqPolicy);
    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
    consumerConfig.setSchema(schemaInfo);
    Consumer consumer;
    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));

    // Initialize the DLQ subscription first and make sure that DLQ topic is created and a schema exists.
    ConsumerConfiguration dlqConsumerConfig;
    dlqConsumerConfig.setConsumerType(ConsumerType::ConsumerShared);
    dlqConsumerConfig.setSchema(schemaInfo);
    Consumer deadLetterConsumer;
    ASSERT_EQ(ResultOk, client.subscribe(dlqPolicy.getDeadLetterTopic(), subName, dlqConsumerConfig,
                                         deadLetterConsumer));

    Producer producer;
    ProducerConfiguration producerConfig;
    producerConfig.setSchema(schemaInfo);
    ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfig, producer));
    std::string data = "{\"re\":2.1,\"im\":1.23}";
    const int num = 10;
    for (int i = 0; i < num; ++i) {
        ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(data).build()));
    }

    // nack all msg.
    Message msg;
    for (int i = 0; i < dlqPolicy.getMaxRedeliverCount() * num + num; ++i) {
        ASSERT_EQ(ResultOk, consumer.receive(msg));
        consumer.negativeAcknowledge(msg);
    }

    // assert dlq msg.
    for (int i = 0; i < num; i++) {
        ASSERT_EQ(ResultOk, deadLetterConsumer.receive(msg, 5000));
        ASSERT_FALSE(msg.getDataAsString().empty());
        ASSERT_TRUE(msg.getProperty(SYSTEM_PROPERTY_REAL_TOPIC).find(topic));
        ASSERT_FALSE(msg.getProperty(PROPERTY_ORIGIN_MESSAGE_ID).empty());
    }
    ASSERT_EQ(ResultTimeout, deadLetterConsumer.receive(msg, 200));

    client.close();
}

// If the user never receives this message, the message should not be delivered to the DLQ.
TEST(DeadLetterQueueTest, testWithoutConsumerReceiveImmediately) {
    Client client(lookupUrl);
    const std::string topic = "testWithoutConsumerReceiveImmediately-" + std::to_string(time(nullptr));
    const std::string subName = "dlq-sub";
    auto dlqPolicy =
        DeadLetterPolicyBuilder().maxRedeliverCount(3).initialSubscriptionName("init-sub").build();
    ConsumerConfiguration consumerConfig;
    consumerConfig.setDeadLetterPolicy(dlqPolicy);
    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
    // set ack timeout is 10 ms.
    PulsarFriend::setConsumerUnAckMessagesTimeoutMs(consumerConfig, 10);
    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
    Consumer consumer;
    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));

    Producer producer;
    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
    producer.send(MessageBuilder().setContent("msg").build());

    // Wait a while, message should not be send to DLQ
    std::this_thread::sleep_for(std::chrono::milliseconds(200));

    Message msg;
    ASSERT_EQ(ResultOk, consumer.receive(msg));
    client.close();
}

TEST(DeadLetterQueueTest, testAutoSetDLQTopicName) {
    Client client(lookupUrl);
    const std::string topic = "testAutoSetDLQName-" + std::to_string(time(nullptr));
    const std::string subName = "dlq-sub";
    const std::string dlqTopic = "persistent://public/default/" + topic + "-" + subName + "-DLQ";
    auto dlqPolicy =
        DeadLetterPolicyBuilder().maxRedeliverCount(3).initialSubscriptionName("init-sub").build();
    ConsumerConfiguration consumerConfig;
    consumerConfig.setDeadLetterPolicy(dlqPolicy);
    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
    consumerConfig.setConsumerType(ConsumerType::ConsumerShared);
    Consumer consumer;
    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));

    auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
    ASSERT_EQ(consumerImpl.deadLetterPolicy_.getDeadLetterTopic(), dlqTopic);

    client.close();
}

class DeadLetterQueueTest : public ::testing::TestWithParam<std::tuple<bool, bool, ConsumerType>> {
   public:
    void SetUp() override {
        bool isProducerBatch = std::get<0>(GetParam());
        bool isMultiConsumer = std::get<1>(GetParam());
        ConsumerType consumerType = std::get<2>(GetParam());

        std::string testSuiteName = testing::UnitTest::GetInstance()->current_test_info()->name();
        std::replace(testSuiteName.begin(), testSuiteName.end(), '/', '_');
        topic_ = testSuiteName + std::to_string(time(nullptr));
        subName_ = "test-sub";
        dlqTopic_ = topic_ + "-" + subName_ + "-DLQ";

        if (isMultiConsumer) {
            // call admin api to make it partitioned
            std::string url = adminUrl + "admin/v2/persistent/public/default/" + topic_ + "/partitions";
            int res = makePutRequest(url, "5");
            LOG_INFO("res = " << res);
            ASSERT_FALSE(res != 204 && res != 409);
        }

        producerConf_.setBatchingEnabled(isProducerBatch);
        consumerConf_.setConsumerType(consumerType);
        consumerConf_.setDeadLetterPolicy(
            DeadLetterPolicyBuilder().maxRedeliverCount(3).deadLetterTopic(dlqTopic_).build());
    }

    void TearDown() override { client_.close(); }

   protected:
    Client client_{lookupUrl};
    ProducerConfiguration producerConf_;
    ConsumerConfiguration consumerConf_;
    std::string topic_;
    std::string subName_;
    std::string dlqTopic_;
};

TEST_P(DeadLetterQueueTest, testSendDLQTriggerByAckTimeOutAndNeAck) {
    Client client(lookupUrl);

    Consumer consumer;
    PulsarFriend::setConsumerUnAckMessagesTimeoutMs(consumerConf_, 200);
    consumerConf_.setNegativeAckRedeliveryDelayMs(100);
    ASSERT_EQ(ResultOk, client.subscribe(topic_, subName_, consumerConf_, consumer));

    Producer producer;
    ASSERT_EQ(ResultOk, client.createProducer(topic_, producerConf_, producer));
    const int num = 100;
    Message msg;
    for (int i = 0; i < num; ++i) {
        msg = MessageBuilder()
                  .setContent(std::to_string(i))
                  .setPartitionKey("p-key")
                  .setOrderingKey("o-key")
                  .setProperty("pk-1", "pv-1")
                  .build();
        producer.sendAsync(msg, [](Result res, const MessageId &msgId) { ASSERT_EQ(res, ResultOk); });
    }

    // receive messages and don't ack.
    for (int i = 0; i < consumerConf_.getDeadLetterPolicy().getMaxRedeliverCount() * num + num; ++i) {
        ASSERT_EQ(ResultOk, consumer.receive(msg));
        // Randomly specify some messages manually negativeAcknowledge.
        if (rand() % 2 == 0) {
            consumer.negativeAcknowledge(msg);
        }
    }

    // assert dlq msg.
    Consumer deadLetterQueueConsumer;
    ConsumerConfiguration dlqConsumerConfig;
    dlqConsumerConfig.setSubscriptionInitialPosition(InitialPositionEarliest);
    ASSERT_EQ(ResultOk, client.subscribe(dlqTopic_, subName_, dlqConsumerConfig, deadLetterQueueConsumer));
    for (int i = 0; i < num; i++) {
        ASSERT_EQ(ResultOk, deadLetterQueueConsumer.receive(msg));
        ASSERT_FALSE(msg.getDataAsString().empty());
        ASSERT_EQ(msg.getPartitionKey(), "p-key");
        ASSERT_EQ(msg.getOrderingKey(), "o-key");
        ASSERT_EQ(msg.getProperty("pk-1"), "pv-1");
        ASSERT_TRUE(msg.getProperty(SYSTEM_PROPERTY_REAL_TOPIC).find(topic_));
        ASSERT_FALSE(msg.getProperty(PROPERTY_ORIGIN_MESSAGE_ID).empty());
    }

    ASSERT_EQ(ResultTimeout, deadLetterQueueConsumer.receive(msg, 200));
}

TEST_P(DeadLetterQueueTest, testSendDLQTriggerByRedeliverUnacknowledgedMessages) {
    Client client(lookupUrl);

    Consumer consumer;
    ASSERT_EQ(ResultOk, client.subscribe(topic_, subName_, consumerConf_, consumer));

    Producer producer;
    ASSERT_EQ(ResultOk, client.createProducer(topic_, producerConf_, producer));

    const int num = 10;
    Message msg;
    for (int i = 0; i < num; ++i) {
        msg = MessageBuilder()
                  .setContent(std::to_string(i))
                  .setPartitionKey("p-key")
                  .setOrderingKey("o-key")
                  .setProperty("pk-1", "pv-1")
                  .build();
        producer.sendAsync(msg, [](Result res, const MessageId &msgId) { ASSERT_EQ(res, ResultOk); });
    }

    // nack all msg.
    for (int i = 1; i <= consumerConf_.getDeadLetterPolicy().getMaxRedeliverCount() * num + num; ++i) {
        ASSERT_EQ(ResultOk, consumer.receive(msg));
        if (i % num == 0) {
            consumer.redeliverUnacknowledgedMessages();
        }
    }

    // assert dlq msg.
    Consumer deadLetterQueueConsumer;
    ConsumerConfiguration dlqConsumerConfig;
    dlqConsumerConfig.setSubscriptionInitialPosition(InitialPositionEarliest);
    ASSERT_EQ(ResultOk, client.subscribe(dlqTopic_, subName_, dlqConsumerConfig, deadLetterQueueConsumer));
    for (int i = 0; i < num; i++) {
        ASSERT_EQ(ResultOk, deadLetterQueueConsumer.receive(msg));
        ASSERT_FALSE(msg.getDataAsString().empty());
        ASSERT_EQ(msg.getPartitionKey(), "p-key");
        ASSERT_EQ(msg.getOrderingKey(), "o-key");
        ASSERT_EQ(msg.getProperty("pk-1"), "pv-1");
        ASSERT_EQ(msg.getMessageId().batchSize(), 0);
        ASSERT_EQ(msg.getMessageId().batchIndex(), -1);
        ASSERT_TRUE(msg.getProperty(SYSTEM_PROPERTY_REAL_TOPIC).find(topic_));
        ASSERT_FALSE(msg.getProperty(PROPERTY_ORIGIN_MESSAGE_ID).empty());
    }
    ASSERT_EQ(ResultTimeout, deadLetterQueueConsumer.receive(msg, 200));
}

TEST_P(DeadLetterQueueTest, testSendDLQTriggerByNegativeAcknowledge) {
    Client client(lookupUrl);

    Consumer consumer;
    consumerConf_.setNegativeAckRedeliveryDelayMs(100);
    ASSERT_EQ(ResultOk, client.subscribe(topic_, subName_, consumerConf_, consumer));

    Producer producer;
    ASSERT_EQ(ResultOk, client.createProducer(topic_, producerConf_, producer));

    const int num = 10;
    Message msg;
    for (int i = 0; i < num; ++i) {
        msg = MessageBuilder()
                  .setContent(std::to_string(i))
                  .setPartitionKey("p-key")
                  .setOrderingKey("o-key")
                  .setProperty("pk-1", "pv-1")
                  .build();
        producer.sendAsync(msg, [](Result res, const MessageId &msgId) { ASSERT_EQ(res, ResultOk); });
    }

    // nack all msg.
    for (int i = 0; i < consumerConf_.getDeadLetterPolicy().getMaxRedeliverCount() * num + num; ++i) {
        ASSERT_EQ(ResultOk, consumer.receive(msg));
        consumer.negativeAcknowledge(msg);
    }

    // assert dlq msg.
    Consumer deadLetterQueueConsumer;
    ConsumerConfiguration dlqConsumerConfig;
    dlqConsumerConfig.setSubscriptionInitialPosition(InitialPositionEarliest);
    ASSERT_EQ(ResultOk, client.subscribe(dlqTopic_, "dlq-sub", dlqConsumerConfig, deadLetterQueueConsumer));
    for (int i = 0; i < num; i++) {
        ASSERT_EQ(ResultOk, deadLetterQueueConsumer.receive(msg));
        ASSERT_FALSE(msg.getDataAsString().empty());
        ASSERT_EQ(msg.getPartitionKey(), "p-key");
        ASSERT_EQ(msg.getOrderingKey(), "o-key");
        ASSERT_EQ(msg.getProperty("pk-1"), "pv-1");
        ASSERT_TRUE(msg.getProperty(SYSTEM_PROPERTY_REAL_TOPIC).find(topic_));
        ASSERT_FALSE(msg.getProperty(PROPERTY_ORIGIN_MESSAGE_ID).empty());
    }
    ASSERT_EQ(ResultTimeout, deadLetterQueueConsumer.receive(msg, 200));
}

TEST_P(DeadLetterQueueTest, testInitSubscription) {
    Client client(lookupUrl);

    const std::string dlqInitSub = "dlq-init-sub";
    auto dlqPolicy = DeadLetterPolicyBuilder()
                         .maxRedeliverCount(3)
                         .initialSubscriptionName(dlqInitSub)
                         .deadLetterTopic(dlqTopic_)
                         .build();
    consumerConf_.setDeadLetterPolicy(dlqPolicy);
    consumerConf_.setNegativeAckRedeliveryDelayMs(100);
    Consumer consumer;
    ASSERT_EQ(ResultOk, client.subscribe(topic_, subName_, consumerConf_, consumer));

    Consumer deadLetterQueueConsumer;
    ConsumerConfiguration dlqConsumerConfig;
    dlqConsumerConfig.setSubscriptionInitialPosition(InitialPositionEarliest);
    ASSERT_EQ(ResultOk, client.subscribe(dlqTopic_, subName_, dlqConsumerConfig, deadLetterQueueConsumer));

    Producer producer;
    ASSERT_EQ(ResultOk, client.createProducer(topic_, producerConf_, producer));

    const int num = 10;
    Message msg;
    for (int i = 0; i < num; ++i) {
        msg = MessageBuilder().setContent(std::to_string(i)).build();
        ASSERT_EQ(ResultOk, producer.send(msg));
    }

    // nack all msg.
    for (int i = 0; i < dlqPolicy.getMaxRedeliverCount() * num + num; ++i) {
        ASSERT_EQ(ResultOk, consumer.receive(msg));
        consumer.negativeAcknowledge(msg);
    }

    // Use this subscription to ensure that messages are sent to the DLQ.
    for (int i = 0; i < num; i++) {
        ASSERT_EQ(ResultOk, deadLetterQueueConsumer.receive(msg));
        ASSERT_FALSE(msg.getDataAsString().empty());
        ASSERT_TRUE(msg.getProperty(SYSTEM_PROPERTY_REAL_TOPIC).find(topic_));
        ASSERT_FALSE(msg.getProperty(PROPERTY_ORIGIN_MESSAGE_ID).empty());
    }

    // If there is no initial subscription, then the subscription will not receive the DLQ messages sent
    // before the subscription.
    Consumer initDLQConsumer;
    ConsumerConfiguration initDLQConsumerConfig;
    dlqConsumerConfig.setSubscriptionInitialPosition(InitialPositionLatest);
    ASSERT_EQ(ResultOk, client.subscribe(dlqTopic_, dlqInitSub, initDLQConsumerConfig, initDLQConsumer));
    for (int i = 0; i < num; i++) {
        ASSERT_EQ(ResultOk, initDLQConsumer.receive(msg, 1000));
        ASSERT_FALSE(msg.getDataAsString().empty());
        ASSERT_TRUE(msg.getProperty(SYSTEM_PROPERTY_REAL_TOPIC).find(topic_));
        ASSERT_FALSE(msg.getProperty(PROPERTY_ORIGIN_MESSAGE_ID).empty());
    }
    ASSERT_EQ(ResultTimeout, initDLQConsumer.receive(msg, 200));
}

INSTANTIATE_TEST_SUITE_P(Pulsar, DeadLetterQueueTest,
                         testing::Combine(testing::Values(true, false), testing::Values(true, false),
                                          testing::Values(ConsumerType::ConsumerShared,
                                                          ConsumerType::ConsumerKeyShared)),
                         [](const testing::TestParamInfo<DeadLetterQueueTest::ParamType> &info) {
                             return "isBatch_" + std::to_string(std::get<0>(info.param)) + "_isMultiTopics_" +
                                    std::to_string(std::get<1>(info.param)) + "_subType_" +
                                    std::to_string(std::get<2>(info.param));
                         });

}  // namespace pulsar
