/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
#include <gtest/gtest.h>
#include <pulsar/Client.h>
#include <time.h>

#include <atomic>
#include <map>
#include <utility>

#include "lib/Latch.h"

using namespace pulsar;

static ProducerConfiguration createDefaultProducerConfig() {
    // The default producer configuration only use number of messages to limit batching
    return ProducerConfiguration()
        .setBatchingType(ProducerConfiguration::KeyBasedBatching)
        .setBatchingMaxAllowedSizeInBytes(static_cast<unsigned long>(-1))
        .setBatchingMaxPublishDelayMs(3600 * 1000);
}

class KeyBasedBatchingTest : public ::testing::Test {
   protected:
    KeyBasedBatchingTest() : client_("pulsar://localhost:6650") {}

    void TearDown() override { client_.close(); }

    void initTopicName(const std::string& testName) {
        topicName_ = "KeyBasedBatchingTest-" + testName + "-" + std::to_string(time(nullptr));
    }

    void initProducer(const ProducerConfiguration& producerConfig) {
        ASSERT_EQ(ResultOk, client_.createProducer(topicName_, producerConfig, producer_));
    }

    void initConsumer() { ASSERT_EQ(ResultOk, client_.subscribe(topicName_, "SubscriptionName", consumer_)); }

    void receiveAndAck(Message& msg) {
        ASSERT_EQ(ResultOk, consumer_.receive(msg, 3000));
        ASSERT_EQ(ResultOk, consumer_.acknowledge(msg));
    }

    Client client_;
    Producer producer_;
    Consumer consumer_;
    std::string topicName_;
};

TEST_F(KeyBasedBatchingTest, testFlush) {
    initTopicName("Flush");
    // no limits for batching
    initProducer(createDefaultProducerConfig().setBatchingMaxMessages(
        static_cast<unsigned int>(-1))  // no limits for batching
    );

    constexpr int numMessages = 100;
    const std::string keys[] = {"A", "B"};
    std::atomic_int numMessageSent{0};
    for (int i = 0; i < numMessages; i++) {
        producer_.sendAsync(MessageBuilder().setOrderingKey(keys[i % 2]).setContent("x").build(),
                            [&numMessageSent](Result result, const MessageId&) {
                                numMessageSent++;
                                ASSERT_EQ(result, ResultOk);
                            });
    }

    ASSERT_EQ(ResultOk, producer_.flush());
    ASSERT_EQ(numMessageSent.load(), numMessages);
}

TEST_F(KeyBasedBatchingTest, testOrderingKeyPriority) {
    initTopicName("OrderingKeyPriority");
    initProducer(createDefaultProducerConfig().setBatchingMaxMessages(3));
    initConsumer();

    Latch latch(3);
    auto sendCallback = [&latch](Result result, const MessageId& id) {
        ASSERT_EQ(result, ResultOk);
        latch.countdown();
    };
    // "0" is send to batch of "A" because ordering key has higher priority
    producer_.sendAsync(MessageBuilder().setContent("0").setOrderingKey("A").setPartitionKey("B").build(),
                        sendCallback);
    producer_.sendAsync(MessageBuilder().setContent("1").setOrderingKey("A").build(), sendCallback);
    producer_.sendAsync(MessageBuilder().setContent("2").setOrderingKey("B").build(), sendCallback);
    latch.countdown();

    Message msg;
    receiveAndAck(msg);
    ASSERT_EQ("0", msg.getDataAsString());
    ASSERT_EQ("A", msg.getOrderingKey());
    ASSERT_EQ("B", msg.getPartitionKey());
    receiveAndAck(msg);
    ASSERT_EQ("1", msg.getDataAsString());
    ASSERT_EQ("A", msg.getOrderingKey());
    receiveAndAck(msg);
    ASSERT_EQ("2", msg.getDataAsString());
    ASSERT_EQ("B", msg.getOrderingKey());
}

TEST_F(KeyBasedBatchingTest, testSequenceId) {
    initTopicName("SequenceId");
    initProducer(createDefaultProducerConfig().setBatchingMaxMessages(6));
    initConsumer();

    Latch latch(6);
    auto sendAsync = [this, &latch](const std::string& key, const std::string& value) {
        producer_.sendAsync(MessageBuilder().setOrderingKey(key).setContent(value).build(),
                            [&latch](Result result, const MessageId& id) {
                                ASSERT_EQ(result, ResultOk);
                                latch.countdown();
                            });
    };
    sendAsync("A", "0");
    sendAsync("B", "1");
    sendAsync("C", "2");
    sendAsync("B", "3");
    sendAsync("C", "4");
    sendAsync("A", "5");
    // sequence id: B < C < A, so there are 3 batches in order as following:
    //   B: 1, 3
    //   C: 2, 4
    //   A: 0, 5
    latch.wait();

    std::vector<std::string> receivedKeys;
    std::vector<std::string> receivedValues;
    for (int i = 0; i < 6; i++) {
        Message msg;
        receiveAndAck(msg);
        receivedKeys.emplace_back(msg.getOrderingKey());
        receivedValues.emplace_back(msg.getDataAsString());
    }

    decltype(receivedKeys) expectedKeys{"B", "B", "C", "C", "A", "A"};
    decltype(receivedValues) expectedValues{"1", "3", "2", "4", "0", "5"};
    EXPECT_EQ(receivedKeys, expectedKeys);
    EXPECT_EQ(receivedValues, expectedValues);
}

TEST_F(KeyBasedBatchingTest, testSingleBatch) {
    initTopicName("SingleBatch");
    initProducer(createDefaultProducerConfig().setBatchingMaxMessages(5));
    initConsumer();

    constexpr int numMessages = 5 * 100;
    std::atomic_int numMessageSent{0};
    // messages with no key are packed to the same batch and this batch has no key
    // the broker uses `NON_KEY` as the key when dispatching messages from this batch
    for (int i = 0; i < numMessages; i++) {
        producer_.sendAsync(MessageBuilder().setContent("x").build(),
                            [&numMessageSent](Result result, const MessageId&) {
                                ASSERT_EQ(result, ResultOk);
                                ++numMessageSent;
                            });
    }

    Message msg;
    for (int i = 0; i < numMessages; i++) {
        receiveAndAck(msg);
    }
    ASSERT_EQ(ResultTimeout, consumer_.receive(msg, 3000));
    ASSERT_EQ(numMessageSent.load(), numMessages);
}

TEST_F(KeyBasedBatchingTest, testCloseBeforeSend) {
    initTopicName("CloseBeforeSend");
    // Any asynchronous send won't be completed unless `close()` or `flush()` is triggered
    initProducer(createDefaultProducerConfig().setBatchingMaxMessages(static_cast<unsigned>(-1)));

    std::mutex mtx;
    std::vector<Result> results;
    auto saveResult = [&mtx, &results](Result result) {
        std::lock_guard<std::mutex> lock(mtx);
        results.emplace_back(result);
    };
    auto sendAsync = [saveResult, this](const std::string& key, const std::string& value) {
        producer_.sendAsync(MessageBuilder().setOrderingKey(key).setContent(value).build(),
                            [saveResult](Result result, const MessageId& id) { saveResult(result); });
    };

    constexpr int numKeys = 10;
    for (int i = 0; i < numKeys; i++) {
        sendAsync("key-" + std::to_string(i), "value");
    }

    ASSERT_EQ(ResultOk, producer_.close());

    // After close() completed, all callbacks should have failed with ResultAlreadyClosed
    std::lock_guard<std::mutex> lock(mtx);
    ASSERT_EQ(results.size(), numKeys);
    for (int i = 0; i < numKeys; i++) {
        ASSERT_EQ(results[i], ResultAlreadyClosed) << " results[" << i << "] is " << results[i];
    }
}
