blob: a05e6a39e6223a9a9c67ce90c8fc98e175066dc8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <gtest/gtest.h>
#include <pulsar/Client.h>
#include <sys/resource.h>
#include <time.h>
#include "../HttpHelper.h"
using namespace pulsar;
class ConnectionFailTest : public ::testing::TestWithParam<int> {
public:
void SetUp() override {
struct rlimit limit;
ASSERT_EQ(getrlimit(RLIMIT_NOFILE, &limit), 0);
maxFdCount_ = limit.rlim_max;
int numPartitions = GetParam();
topic_ = "test-connection-fail-" + std::to_string(numPartitions) + std::to_string(time(nullptr));
if (numPartitions > 0) {
int res = makePutRequest(
"http://localhost:8080/admin/v2/persistent/public/default/" + topic_ + "/partitions",
std::to_string(numPartitions));
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
}
}
void TearDown() override { client_.close(); }
protected:
Client client_{"pulsar://localhost:6650"};
std::string topic_;
int maxFdCount_;
void updateFdLimit(int fdLimit) {
struct rlimit limit;
limit.rlim_cur = fdLimit;
limit.rlim_max = maxFdCount_;
ASSERT_EQ(setrlimit(RLIMIT_NOFILE, &limit), 0);
std::cout << "Updated fd limit to " << limit.rlim_cur << '\n';
}
};
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
TEST_P(ConnectionFailTest, test) {
int fdLimit = 1;
for (; fdLimit < 100; fdLimit++) {
updateFdLimit(fdLimit);
Producer producer;
auto result = client_.createProducer(topic_, producer);
ASSERT_TRUE(result == ResultOk || result == ResultConnectError);
if (result == ResultConnectError) {
continue;
}
Consumer consumer;
result = client_.subscribe(topic_, "sub", consumer);
ASSERT_TRUE(result == ResultOk || result == ResultConnectError);
if (result == ResultConnectError) {
continue;
}
Reader reader;
result = client_.createReader(topic_, MessageId::earliest(), {}, reader);
ASSERT_TRUE(result == ResultOk || result == ResultConnectError);
if (result == ResultOk) {
break;
}
}
std::cout << "Create producer, consumer and reader successfully when fdLimit is " << fdLimit << '\n';
ASSERT_TRUE(fdLimit < 100);
}
INSTANTIATE_TEST_SUITE_P(Unix, ConnectionFailTest, ::testing::Values(0, 5));