/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
#include <BinaryProtoLookupService.h>
#include <Future.h>
#include <HTTPLookupService.h>
#include <Utils.h>
#include <gtest/gtest.h>
#include <pulsar/Authentication.h>
#include <pulsar/Client.h>

#include <algorithm>
#include <boost/exception/all.hpp>
#include <future>
#include <stdexcept>

#include "HttpHelper.h"
#include "PulsarFriend.h"
#include "lib/ClientConnection.h"
#include "lib/ConnectionPool.h"
#include "lib/LogUtils.h"
#include "lib/RetryableLookupService.h"
#include "lib/TimeUtils.h"

using namespace pulsar;

DECLARE_LOG_OBJECT()

static std::string binaryLookupUrl = "pulsar://localhost:6650";
static std::string httpLookupUrl = "http://localhost:8080";

extern std::string unique_str();

TEST(LookupServiceTest, basicLookup) {
    ExecutorServiceProviderPtr service = std::make_shared<ExecutorServiceProvider>(1);
    AuthenticationPtr authData = AuthFactory::Disabled();
    std::string url = "pulsar://localhost:6650";
    ClientConfiguration conf;
    ExecutorServiceProviderPtr ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(1));
    ConnectionPool pool_(conf, ioExecutorProvider_, authData, true, "");
    ServiceNameResolver serviceNameResolver(url);
    BinaryProtoLookupService lookupService(serviceNameResolver, pool_, conf);

    TopicNamePtr topicName = TopicName::get("topic");

    Future<Result, LookupDataResultPtr> partitionFuture = lookupService.getPartitionMetadataAsync(topicName);
    LookupDataResultPtr lookupData;
    Result result = partitionFuture.get(lookupData);
    ASSERT_TRUE(lookupData != NULL);
    ASSERT_EQ(0, lookupData->getPartitions());

    const auto topicNamePtr = TopicName::get("topic");
    auto future = lookupService.getBroker(*topicNamePtr);
    LookupService::LookupResult lookupResult;
    result = future.get(lookupResult);

    ASSERT_EQ(ResultOk, result);
    ASSERT_EQ(url, lookupResult.logicalAddress);
    ASSERT_EQ(url, lookupResult.physicalAddress);
}

static void testMultiAddresses(LookupService& lookupService) {
    std::vector<Result> results;
    constexpr int numRequests = 6;

    auto verifySuccessCount = [&results] {
        // Only half of them succeeded
        ASSERT_EQ(std::count(results.cbegin(), results.cend(), ResultOk), numRequests / 2);
        ASSERT_EQ(std::count(results.cbegin(), results.cend(), ResultRetryable), numRequests / 2);
    };

    for (int i = 0; i < numRequests; i++) {
        const auto topicNamePtr = TopicName::get("topic");
        LookupService::LookupResult lookupResult;
        const auto result = lookupService.getBroker(*topicNamePtr).get(lookupResult);
        LOG_INFO("getBroker [" << i << "] " << result << ", " << lookupResult);
        results.emplace_back(result);
    }
    verifySuccessCount();

    results.clear();
    for (int i = 0; i < numRequests; i++) {
        LookupDataResultPtr data;
        const auto result = lookupService.getPartitionMetadataAsync(TopicName::get("topic")).get(data);
        LOG_INFO("getPartitionMetadataAsync [" << i << "] " << result);
        results.emplace_back(result);
    }
    verifySuccessCount();

    results.clear();
    for (int i = 0; i < numRequests; i++) {
        NamespaceTopicsPtr data;
        const auto result = lookupService
                                .getTopicsOfNamespaceAsync(TopicName::get("topic")->getNamespaceName(),
                                                           CommandGetTopicsOfNamespace_Mode_PERSISTENT)
                                .get(data);
        LOG_INFO("getTopicsOfNamespaceAsync [" << i << "] " << result);
        results.emplace_back(result);
    }
    verifySuccessCount();
}

TEST(LookupServiceTest, testMultiAddresses) {
    ConnectionPool pool({}, std::make_shared<ExecutorServiceProvider>(1), AuthFactory::Disabled(), true, "");
    ServiceNameResolver serviceNameResolver("pulsar://localhost,localhost:9999");
    ClientConfiguration conf;
    BinaryProtoLookupService binaryLookupService(serviceNameResolver, pool, conf);
    testMultiAddresses(binaryLookupService);

    // HTTPLookupService calls shared_from_this() internally, we must create a shared pointer to test
    ServiceNameResolver serviceNameResolverForHttp("http://localhost,localhost:9999");
    auto httpLookupServicePtr = std::make_shared<HTTPLookupService>(
        std::ref(serviceNameResolverForHttp), ClientConfiguration{}, AuthFactory::Disabled());
    testMultiAddresses(*httpLookupServicePtr);
}
TEST(LookupServiceTest, testRetry) {
    auto executorProvider = std::make_shared<ExecutorServiceProvider>(1);
    ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), true, "");
    ServiceNameResolver serviceNameResolver("pulsar://localhost:9999,localhost");
    ClientConfiguration conf;

    auto lookupService = RetryableLookupService::create(
        std::make_shared<BinaryProtoLookupService>(serviceNameResolver, pool, conf), 30 /* seconds */,
        executorProvider);

    PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0);
    auto topicNamePtr = TopicName::get("lookup-service-test-retry");
    auto future1 = lookupService->getBroker(*topicNamePtr);
    LookupService::LookupResult lookupResult;
    ASSERT_EQ(ResultOk, future1.get(lookupResult));
    LOG_INFO("getBroker returns logicalAddress: " << lookupResult.logicalAddress
                                                  << ", physicalAddress: " << lookupResult.physicalAddress);

    PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0);
    auto future2 = lookupService->getPartitionMetadataAsync(topicNamePtr);
    LookupDataResultPtr lookupDataResultPtr;
    ASSERT_EQ(ResultOk, future2.get(lookupDataResultPtr));
    LOG_INFO("getPartitionMetadataAsync returns " << lookupDataResultPtr->getPartitions() << " partitions");

    PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0);
    auto future3 = lookupService->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(),
                                                            CommandGetTopicsOfNamespace_Mode_PERSISTENT);
    NamespaceTopicsPtr namespaceTopicsPtr;
    ASSERT_EQ(ResultOk, future3.get(namespaceTopicsPtr));
    LOG_INFO("getTopicPartitionName Async returns " << namespaceTopicsPtr->size() << " topics");

    std::atomic_int retryCount{0};
    constexpr int totalRetryCount = 3;
    auto future4 = lookupService->executeAsync<int>("key", [&retryCount]() -> Future<Result, int> {
        Promise<Result, int> promise;
        if (++retryCount < totalRetryCount) {
            LOG_INFO("Retry count: " << retryCount);
            promise.setFailed(ResultRetryable);
        } else {
            LOG_INFO("Retry done with " << retryCount << " times");
            promise.setValue(100);
        }
        return promise.getFuture();
    });
    int customResult = 0;
    ASSERT_EQ(ResultOk, future4.get(customResult));
    ASSERT_EQ(customResult, 100);
    ASSERT_EQ(retryCount.load(), totalRetryCount);

    ASSERT_EQ(PulsarFriend::getNumberOfPendingTasks(*lookupService), 0);
}

TEST(LookupServiceTest, testTimeout) {
    auto executorProvider = std::make_shared<ExecutorServiceProvider>(1);
    ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), true, "");
    ServiceNameResolver serviceNameResolver("pulsar://localhost:9990,localhost:9902,localhost:9904");
    ClientConfiguration conf;

    constexpr int timeoutInSeconds = 2;
    auto lookupService = RetryableLookupService::create(
        std::make_shared<BinaryProtoLookupService>(serviceNameResolver, pool, conf), timeoutInSeconds,
        executorProvider);
    auto topicNamePtr = TopicName::get("lookup-service-test-retry");

    decltype(std::chrono::high_resolution_clock::now()) startTime;
    auto beforeMethod = [&startTime] { startTime = std::chrono::high_resolution_clock::now(); };
    auto afterMethod = [&startTime](const std::string& name) {
        auto timeInterval = std::chrono::duration_cast<std::chrono::milliseconds>(
                                std::chrono::high_resolution_clock::now() - startTime)
                                .count();
        LOG_INFO(name << " took " << timeInterval << " seconds");
        ASSERT_TRUE(timeInterval >= timeoutInSeconds * 1000L);
    };

    beforeMethod();
    auto future1 = lookupService->getBroker(*topicNamePtr);
    LookupService::LookupResult lookupResult;
    ASSERT_EQ(ResultTimeout, future1.get(lookupResult));
    afterMethod("getBroker");

    beforeMethod();
    auto future2 = lookupService->getPartitionMetadataAsync(topicNamePtr);
    LookupDataResultPtr lookupDataResultPtr;
    ASSERT_EQ(ResultTimeout, future2.get(lookupDataResultPtr));
    afterMethod("getPartitionMetadataAsync");

    beforeMethod();
    auto future3 = lookupService->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(),
                                                            CommandGetTopicsOfNamespace_Mode_PERSISTENT);
    NamespaceTopicsPtr namespaceTopicsPtr;
    ASSERT_EQ(ResultTimeout, future3.get(namespaceTopicsPtr));
    afterMethod("getTopicsOfNamespaceAsync");

    ASSERT_EQ(PulsarFriend::getNumberOfPendingTasks(*lookupService), 0);
}

class LookupServiceTest : public ::testing::TestWithParam<std::string> {
   public:
    void SetUp() override { client_ = Client{GetParam()}; }
    void TearDown() override { client_.close(); }

   protected:
    Client client_{httpLookupUrl};
};

TEST_P(LookupServiceTest, basicGetNamespaceTopics) {
    Result result;

    auto nsName = NamespaceName::get("public", GetParam().substr(0, 4) + std::to_string(time(nullptr)));
    std::string topicName1 = "persistent://" + nsName->toString() + "/basicGetNamespaceTopics1";
    std::string topicName2 = "persistent://" + nsName->toString() + "/basicGetNamespaceTopics2";
    std::string topicName3 = "non-persistent://" + nsName->toString() + "/basicGetNamespaceTopics3";

    // 0. create a namespace
    auto createNsUrl = httpLookupUrl + "/admin/v2/namespaces/" + nsName->toString();
    auto res = makePutRequest(createNsUrl, "");
    ASSERT_FALSE(res != 204 && res != 409);

    // 1. trigger auto create topic
    Producer producer1;
    result = client_.createProducer(topicName1, producer1);
    ASSERT_EQ(ResultOk, result);
    Producer producer2;
    result = client_.createProducer(topicName2, producer2);
    ASSERT_EQ(ResultOk, result);
    Producer producer3;
    result = client_.createProducer(topicName3, producer3);
    ASSERT_EQ(ResultOk, result);

    // 2. verify getTopicsOfNamespace by regex mode.
    auto lookupServicePtr = PulsarFriend::getClientImplPtr(client_)->getLookup();
    auto verifyGetTopics = [&](CommandGetTopicsOfNamespace_Mode mode,
                               const std::set<std::string>& expectedTopics) {
        Future<Result, NamespaceTopicsPtr> getTopicsFuture =
            lookupServicePtr->getTopicsOfNamespaceAsync(nsName, mode);
        NamespaceTopicsPtr topicsData;
        result = getTopicsFuture.get(topicsData);
        ASSERT_EQ(ResultOk, result);
        ASSERT_TRUE(topicsData != NULL);
        std::set<std::string> actualTopics(topicsData->begin(), topicsData->end());
        ASSERT_EQ(expectedTopics, actualTopics);
    };
    verifyGetTopics(CommandGetTopicsOfNamespace_Mode_PERSISTENT, {topicName1, topicName2});
    verifyGetTopics(CommandGetTopicsOfNamespace_Mode_NON_PERSISTENT, {topicName3});
    verifyGetTopics(CommandGetTopicsOfNamespace_Mode_ALL, {topicName1, topicName2, topicName3});

    client_.close();
}

TEST_P(LookupServiceTest, testGetSchema) {
    const std::string topic = "testGetSchema" + std::to_string(time(nullptr)) + GetParam().substr(0, 4);
    std::string jsonSchema =
        R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";

    StringMap properties;
    properties.emplace("key1", "value1");
    properties.emplace("key2", "value2");

    ProducerConfiguration producerConfiguration;
    producerConfiguration.setSchema(SchemaInfo(SchemaType::JSON, "json", jsonSchema, properties));
    Producer producer;
    ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConfiguration, producer));

    auto clientImplPtr = PulsarFriend::getClientImplPtr(client_);
    auto lookup = clientImplPtr->getLookup();

    SchemaInfo schemaInfo;
    auto future = lookup->getSchema(TopicName::get(topic));
    ASSERT_EQ(ResultOk, future.get(schemaInfo));
    ASSERT_EQ(jsonSchema, schemaInfo.getSchema());
    ASSERT_EQ(SchemaType::JSON, schemaInfo.getSchemaType());
    ASSERT_EQ(properties, schemaInfo.getProperties());
}

TEST_P(LookupServiceTest, testGetSchemaNotFound) {
    const std::string topic =
        "testGetSchemaNotFund" + std::to_string(time(nullptr)) + GetParam().substr(0, 4);

    Producer producer;
    ASSERT_EQ(ResultOk, client_.createProducer(topic, producer));

    auto clientImplPtr = PulsarFriend::getClientImplPtr(client_);
    auto lookup = clientImplPtr->getLookup();

    SchemaInfo schemaInfo;
    auto future = lookup->getSchema(TopicName::get(topic));
    ASSERT_EQ(ResultTopicNotFound, future.get(schemaInfo));
}

TEST_P(LookupServiceTest, testGetKeyValueSchema) {
    const std::string topic =
        "testGetKeyValueSchema" + std::to_string(time(nullptr)) + GetParam().substr(0, 4);
    StringMap properties;
    properties.emplace("key1", "value1");
    properties.emplace("key2", "value2");
    std::string jsonSchema =
        R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
    SchemaInfo keySchema(JSON, "key-json", jsonSchema, properties);
    SchemaInfo valueSchema(JSON, "value-json", jsonSchema, properties);
    SchemaInfo keyValueSchema(keySchema, valueSchema, KeyValueEncodingType::INLINE);

    ProducerConfiguration producerConfiguration;
    producerConfiguration.setSchema(keyValueSchema);
    Producer producer;
    ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConfiguration, producer));

    auto clientImplPtr = PulsarFriend::getClientImplPtr(client_);
    auto lookup = clientImplPtr->getLookup();

    SchemaInfo schemaInfo;
    auto future = lookup->getSchema(TopicName::get(topic));
    ASSERT_EQ(ResultOk, future.get(schemaInfo));
    ASSERT_EQ(keyValueSchema.getSchema(), schemaInfo.getSchema());
    ASSERT_EQ(SchemaType::KEY_VALUE, schemaInfo.getSchemaType());
    ASSERT_FALSE(schemaInfo.getProperties().empty());
}

TEST_P(LookupServiceTest, testGetSchemaByVersion) {
    const auto topic = "testGetSchemaByVersion" + unique_str() + GetParam().substr(0, 4);
    const std::string schema1 = R"({
  "type": "record",
  "name": "User",
  "namespace": "test",
  "fields": [
    {"name": "name", "type": ["null", "string"]},
    {"name": "age", "type": "int"}
  ]
})";
    const std::string schema2 = R"({
  "type": "record",
  "name": "User",
  "namespace": "test",
  "fields": [
    {"name": "age", "type": "int"},
    {"name": "name", "type": ["null", "string"]}
  ]
})";
    ProducerConfiguration producerConf1;
    producerConf1.setSchema(SchemaInfo{AVRO, "Avro", schema1});
    Producer producer1;
    ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConf1, producer1));
    ProducerConfiguration producerConf2;
    producerConf2.setSchema(SchemaInfo{AVRO, "Avro", schema2});
    Producer producer2;
    ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConf2, producer2));

    // Though these messages are invalid, the C++ client can send them successfully
    producer1.send(MessageBuilder().setContent("msg0").build());
    producer2.send(MessageBuilder().setContent("msg1").build());

    ConsumerConfiguration consumerConf;
    consumerConf.setSubscriptionInitialPosition(InitialPositionEarliest);
    Consumer consumer;
    ASSERT_EQ(ResultOk, client_.subscribe(topic, "sub", consumerConf, consumer));

    Message msg1;
    ASSERT_EQ(ResultOk, consumer.receive(msg1, 3000));
    Message msg2;
    ASSERT_EQ(ResultOk, consumer.receive(msg2, 3000));

    auto getSchemaInfo = [this](const std::string& topic, int64_t version) {
        std::promise<SchemaInfo> p;
        client_.getSchemaInfoAsync(topic, version, [&p](Result result, const SchemaInfo& info) {
            if (result == ResultOk) {
                p.set_value(info);
            } else {
                p.set_exception(std::make_exception_ptr(std::runtime_error(strResult(result))));
            }
        });
        return p.get_future().get();
    };
    {
        ASSERT_EQ(msg1.getLongSchemaVersion(), 0);
        const auto info = getSchemaInfo(topic, 0);
        ASSERT_EQ(info.getSchemaType(), SchemaType::AVRO);
        ASSERT_EQ(info.getSchema(), schema1);
    }
    {
        ASSERT_EQ(msg2.getLongSchemaVersion(), 1);
        const auto info = getSchemaInfo(topic, 1);
        ASSERT_EQ(info.getSchemaType(), SchemaType::AVRO);
        ASSERT_EQ(info.getSchema(), schema2);
    }
    {
        const auto info = getSchemaInfo(topic, -1);
        ASSERT_EQ(info.getSchemaType(), SchemaType::AVRO);
        ASSERT_EQ(info.getSchema(), schema2);
    }
    try {
        getSchemaInfo(topic, 2);
        FAIL();
    } catch (const std::runtime_error& e) {
        ASSERT_EQ(std::string{e.what()}, strResult(ResultTopicNotFound));
    }
    try {
        getSchemaInfo(topic + "-not-exist", 0);
        FAIL();
    } catch (const std::runtime_error& e) {
        ASSERT_EQ(std::string{e.what()}, strResult(ResultTopicNotFound));
    }

    consumer.close();
    producer1.close();
    producer2.close();
}

INSTANTIATE_TEST_SUITE_P(Pulsar, LookupServiceTest, ::testing::Values(binaryLookupUrl, httpLookupUrl));

class BinaryProtoLookupServiceRedirectTestHelper : public BinaryProtoLookupService {
   public:
    BinaryProtoLookupServiceRedirectTestHelper(ServiceNameResolver& serviceNameResolver, ConnectionPool& pool,
                                               const ClientConfiguration& clientConfiguration)
        : BinaryProtoLookupService(serviceNameResolver, pool, clientConfiguration) {}

    LookupResultFuture findBroker(const std::string& address, bool authoritative, const std::string& topic,
                                  size_t redirectCount) {
        return BinaryProtoLookupService::findBroker(address, authoritative, topic, redirectCount);
    }
};  // class BinaryProtoLookupServiceRedirectTestHelper

TEST(LookupServiceTest, testRedirectionLimit) {
    const auto redirect_limit = 5;
    AuthenticationPtr authData = AuthFactory::Disabled();
    ClientConfiguration conf;
    conf.setMaxLookupRedirects(redirect_limit);
    ExecutorServiceProviderPtr ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(1));
    ConnectionPool pool_(conf, ioExecutorProvider_, authData, true, "");
    std::string url = "pulsar://localhost:6650";
    ServiceNameResolver serviceNameResolver(url);
    BinaryProtoLookupServiceRedirectTestHelper lookupService(serviceNameResolver, pool_, conf);

    const auto topicNamePtr = TopicName::get("topic");
    for (auto idx = 0; idx < redirect_limit + 5; ++idx) {
        auto future =
            lookupService.findBroker(serviceNameResolver.resolveHost(), false, topicNamePtr->toString(), idx);
        LookupService::LookupResult lookupResult;
        auto result = future.get(lookupResult);

        if (idx <= redirect_limit) {
            ASSERT_EQ(ResultOk, result);
            ASSERT_EQ(url, lookupResult.logicalAddress);
            ASSERT_EQ(url, lookupResult.physicalAddress);
        } else {
            ASSERT_EQ(ResultTooManyLookupRequestException, result);
        }
    }
}
