blob: c7e5aa0f8cc058c24ff813ca5e256081596fe86c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
// Run `docker-compose up -d` to set up the test environment for this test.
#include <gtest/gtest.h>
#include <thread>
#include <unordered_set>
#include "include/pulsar/Client.h"
#include "lib/LogUtils.h"
#include "lib/Semaphore.h"
#include "lib/TimeUtils.h"
#include "tests/HttpHelper.h"
#include "tests/PulsarFriend.h"
DECLARE_LOG_OBJECT()
using namespace pulsar;
bool checkTime() {
const static auto start = std::chrono::high_resolution_clock::now();
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
return duration < 300 * 1000;
}
TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) {
constexpr auto maxWaitTime = std::chrono::seconds(5);
constexpr long maxWaitTimeMs = maxWaitTime.count() * 1000L;
const static std::string blueAdminUrl = "http://localhost:8080/";
const static std::string greenAdminUrl = "http://localhost:8081/";
const static std::string topicNameSuffix = std::to_string(time(NULL));
const static std::string topicName = "persistent://public/unload-test/topic-" + topicNameSuffix;
ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
std::string url = blueAdminUrl + "admin/v2/clusters/cluster-a/migrate?migrated=false";
int res = makePostRequest(url, R"(
{
"serviceUrl": "http://localhost:8081",
"serviceUrlTls":"https://localhost:8085",
"brokerServiceUrl": "pulsar://localhost:6651",
"brokerServiceUrlTls": "pulsar+ssl://localhost:6655"
})");
LOG_INFO("res:" << res);
return res == 200;
}));
ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
std::string url = blueAdminUrl + "admin/v2/namespaces/public/unload-test?bundles=1";
int res = makePutRequest(url, "");
return res == 204 || res == 409;
}));
ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
std::string url = greenAdminUrl + "admin/v2/namespaces/public/unload-test?bundles=1";
int res = makePutRequest(url, "");
return res == 204 || res == 409;
}));
ClientConfiguration conf;
conf.setIOThreads(8);
Client client{"pulsar://localhost:6650", conf};
Producer producer;
ProducerConfiguration producerConfiguration;
Result producerResult = client.createProducer(topicName, producerConfiguration, producer);
ASSERT_EQ(producerResult, ResultOk);
Consumer consumer;
Result consumerResult = client.subscribe(topicName, "sub", consumer);
ASSERT_EQ(consumerResult, ResultOk);
Semaphore unloadSemaphore(0);
Semaphore pubWaitSemaphore(0);
Semaphore migrationSemaphore(0);
const int msgCount = 20;
SynchronizedHashMap<int, int> producedMsgs;
auto produce = [&]() {
int i = 0;
while (i < msgCount && checkTime()) {
if (i == 3 || i == 8 || i == 17) {
unloadSemaphore.acquire();
}
if (i == 5 || i == 15) {
pubWaitSemaphore.release();
}
if (i == 12) {
migrationSemaphore.acquire();
}
std::string content = std::to_string(i);
const auto msg = MessageBuilder().setContent(content).build();
auto start = TimeUtils::currentTimeMillis();
Result sendResult = producer.send(msg);
auto elapsed = TimeUtils::currentTimeMillis() - start;
LOG_INFO("produce i: " << i << " " << elapsed << " ms");
ASSERT_EQ(sendResult, ResultOk);
ASSERT_TRUE(elapsed < maxWaitTimeMs);
producedMsgs.emplace(i, i);
i++;
}
LOG_INFO("producer finished");
};
std::atomic<bool> stopConsumer(false);
SynchronizedHashMap<int, int> consumedMsgs;
auto consume = [&]() {
Message receivedMsg;
while (checkTime()) {
if (stopConsumer && producedMsgs.size() == msgCount && consumedMsgs.size() == msgCount) {
break;
}
auto start = TimeUtils::currentTimeMillis();
Result receiveResult = consumer.receive(receivedMsg, maxWaitTimeMs);
auto elapsed = TimeUtils::currentTimeMillis() - start;
int i = std::stoi(receivedMsg.getDataAsString());
LOG_INFO("receive i: " << i << " " << elapsed << " ms");
ASSERT_EQ(receiveResult, ResultOk);
ASSERT_TRUE(elapsed < maxWaitTimeMs);
start = TimeUtils::currentTimeMillis();
Result ackResult = consumer.acknowledge(receivedMsg);
elapsed = TimeUtils::currentTimeMillis() - start;
LOG_INFO("acked i:" << i << " " << elapsed << " ms");
ASSERT_TRUE(elapsed < maxWaitTimeMs);
ASSERT_EQ(ackResult, ResultOk);
consumedMsgs.emplace(i, i);
}
LOG_INFO("consumer finished");
};
std::thread produceThread(produce);
std::thread consumeThread(consume);
auto unload = [&](bool migrated) {
const std::string &adminUrl = migrated ? greenAdminUrl : blueAdminUrl;
auto clientImplPtr = PulsarFriend::getClientImplPtr(client);
auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
auto &producerImpl = PulsarFriend::getProducerImpl(producer);
uint64_t lookupCountBeforeUnload;
std::string destinationBroker;
while (checkTime()) {
// make sure producers and consumers are ready
ASSERT_TRUE(waitUntil(maxWaitTime,
[&] { return consumerImpl.isConnected() && producerImpl.isConnected(); }));
std::string url =
adminUrl + "lookup/v2/topic/persistent/public/unload-test/topic" + topicNameSuffix;
std::string responseDataBeforeUnload;
int res = makeGetRequest(url, responseDataBeforeUnload);
if (res != 200) {
continue;
}
std::string prefix = migrated ? "green-" : "";
destinationBroker =
prefix + (responseDataBeforeUnload.find("broker-2") == std::string::npos ? "broker-2:8080"
: "broker-1:8080");
lookupCountBeforeUnload = clientImplPtr->getLookupCount();
ASSERT_TRUE(lookupCountBeforeUnload > 0);
url = adminUrl +
"admin/v2/namespaces/public/unload-test/0x00000000_0xffffffff/unload?destinationBroker=" +
destinationBroker;
LOG_INFO("before lookup responseData:" << responseDataBeforeUnload << ",unload url:" << url
<< ",lookupCountBeforeUnload:" << lookupCountBeforeUnload);
res = makePutRequest(url, "");
LOG_INFO("unload res:" << res);
if (res != 204) {
continue;
}
// make sure producers and consumers are ready
ASSERT_TRUE(waitUntil(maxWaitTime,
[&] { return consumerImpl.isConnected() && producerImpl.isConnected(); }));
std::string responseDataAfterUnload;
ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
url = adminUrl + "lookup/v2/topic/persistent/public/unload-test/topic" + topicNameSuffix;
res = makeGetRequest(url, responseDataAfterUnload);
return res == 200 && responseDataAfterUnload.find(destinationBroker) != std::string::npos;
}));
LOG_INFO("after lookup responseData:" << responseDataAfterUnload << ",res:" << res);
auto lookupCountAfterUnload = clientImplPtr->getLookupCount();
if (lookupCountBeforeUnload != lookupCountAfterUnload) {
continue;
}
break;
}
};
LOG_INFO("#### starting first unload ####");
unload(false);
unloadSemaphore.release();
pubWaitSemaphore.acquire();
LOG_INFO("#### starting second unload ####");
unload(false);
unloadSemaphore.release();
LOG_INFO("#### migrating the cluster ####");
migrationSemaphore.release();
ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
std::string url = blueAdminUrl + "admin/v2/clusters/cluster-a/migrate?migrated=true";
int res = makePostRequest(url, R"({
"serviceUrl": "http://localhost:8081",
"serviceUrlTls":"https://localhost:8085",
"brokerServiceUrl": "pulsar://localhost:6651",
"brokerServiceUrlTls": "pulsar+ssl://localhost:6655"
})");
LOG_INFO("res:" << res);
return res == 200;
}));
ASSERT_TRUE(waitUntil(maxWaitTime, [&] {
auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
auto &producerImpl = PulsarFriend::getProducerImpl(producer);
auto consumerConnAddress = PulsarFriend::getConnectionPhysicalAddress(consumerImpl);
auto producerConnAddress = PulsarFriend::getConnectionPhysicalAddress(producerImpl);
return consumerImpl.isConnected() && producerImpl.isConnected() &&
consumerConnAddress.find("6651") != std::string::npos &&
producerConnAddress.find("6651") != std::string::npos;
}));
pubWaitSemaphore.acquire();
LOG_INFO("#### starting third unload after migration ####");
unload(true);
unloadSemaphore.release();
stopConsumer = true;
consumeThread.join();
produceThread.join();
ASSERT_EQ(producedMsgs.size(), msgCount);
ASSERT_EQ(consumedMsgs.size(), msgCount);
for (int i = 0; i < msgCount; i++) {
producedMsgs.remove(i);
consumedMsgs.remove(i);
}
ASSERT_EQ(producedMsgs.size(), 0);
ASSERT_EQ(consumedMsgs.size(), 0);
client.close();
ASSERT_TRUE(checkTime()) << "timed out";
}
int main(int argc, char *argv[]) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}