blob: 4e9f880d18eb25e4d2ad176697cd745c7c2f7282 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/cdc_client_mgr.h"
#include <gen_cpp/internal_service.pb.h>
#include <gtest/gtest.h>
#include <signal.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <unistd.h>
#include <chrono>
#include <cstdio>
#include <cstdlib>
#include <memory>
#include <thread>
#include "common/config.h"
#include "common/status.h"
namespace doris {
class CdcClientMgrTest : public testing::Test {
public:
void SetUp() override {
// Save original environment variables
_original_doris_home = getenv("DORIS_HOME");
_original_log_dir = getenv("LOG_DIR");
_original_java_home = getenv("JAVA_HOME");
// Use existing DORIS_HOME
const char* doris_home = std::getenv("DORIS_HOME");
if (doris_home) {
_doris_home = doris_home;
_lib_dir = _doris_home + "/lib/cdc_client";
_jar_path = _lib_dir + "/cdc-client.jar";
// Create lib directory and jar file if they don't exist
[[maybe_unused]] int ret1 = system(("mkdir -p " + _lib_dir).c_str());
// Create a dummy jar file for testing if it doesn't exist
if (access(_jar_path.c_str(), F_OK) != 0) {
[[maybe_unused]] int ret2 = system(("touch " + _jar_path).c_str());
_jar_created = true;
}
}
// Use existing LOG_DIR or set a default
const char* log_dir = std::getenv("LOG_DIR");
if (!log_dir) {
_log_dir = "/tmp/doris_test_log";
setenv("LOG_DIR", _log_dir.c_str(), 1);
_log_dir_set = true;
}
}
void TearDown() override {
// Restore original environment variables
if (_original_doris_home) {
setenv("DORIS_HOME", _original_doris_home, 1);
} else {
unsetenv("DORIS_HOME");
}
if (_original_log_dir) {
setenv("LOG_DIR", _original_log_dir, 1);
} else if (_log_dir_set) {
unsetenv("LOG_DIR");
}
if (_original_java_home) {
setenv("JAVA_HOME", _original_java_home, 1);
} else {
unsetenv("JAVA_HOME");
}
// Clean up created jar file if we created it
if (_jar_created && !_jar_path.empty()) {
[[maybe_unused]] int cleanup_ret = system(("rm -f " + _jar_path).c_str());
}
}
protected:
std::string _doris_home;
std::string _log_dir;
std::string _lib_dir;
std::string _jar_path;
const char* _original_doris_home = nullptr;
const char* _original_log_dir = nullptr;
const char* _original_java_home = nullptr;
bool _jar_created = false;
bool _log_dir_set = false;
};
// Test stop method when there's no child process
TEST_F(CdcClientMgrTest, StopWithoutChild) {
CdcClientMgr mgr;
// Should not crash
mgr.stop();
}
// Test stop when child process is already dead (covers lines 98-111: kill(pid, 0) == 0 is false)
TEST_F(CdcClientMgrTest, StopWhenProcessDead) {
CdcClientMgr mgr;
// Start CDC client (sets PID to 99999 in BE_TEST mode)
PRequestCdcClientResult result;
Status status = mgr.start_cdc_client(&result);
EXPECT_TRUE(status.ok());
EXPECT_GT(mgr.get_child_pid(), 0);
// Stop - since PID 99999 doesn't exist, kill(99999, 0) will fail
// This should trigger the branch where kill(pid, 0) != 0 (process already dead)
mgr.stop();
// PID should be reset to 0
EXPECT_EQ(mgr.get_child_pid(), 0);
}
// Test stop with real process that exits gracefully (covers lines 98-111: graceful shutdown)
TEST_F(CdcClientMgrTest, StopWithRealProcessGraceful) {
CdcClientMgr mgr;
// Use popen to start a background sleep process and get its PID
// This avoids fork() which conflicts with gcov/coverage tools
FILE* pipe = popen("sleep 10 & echo $!", "r");
if (pipe) {
char buffer[128];
if (fgets(buffer, sizeof(buffer), pipe) != nullptr) {
pid_t real_pid = std::atoi(buffer);
pclose(pipe);
if (real_pid > 0) {
// Set the PID in the manager
mgr.set_child_pid_for_test(real_pid);
// Call stop - process will respond to SIGTERM and exit
// This covers the graceful shutdown path
mgr.stop();
// Verify PID is reset
EXPECT_EQ(mgr.get_child_pid(), 0);
// Clean up: make sure child is dead
kill(real_pid, SIGKILL);
waitpid(real_pid, nullptr, WNOHANG);
}
} else {
pclose(pipe);
}
}
}
// Test stop with real process that requires force kill (covers lines 98-111: force kill path)
TEST_F(CdcClientMgrTest, StopWithRealProcessForceKill) {
CdcClientMgr mgr;
// Start a bash process that ignores SIGTERM by trapping it
// This process will not exit on SIGTERM, requiring SIGKILL
const char* script = "bash -c 'trap \"\" TERM; while true; do sleep 1; done' & echo $!";
FILE* pipe = popen(script, "r");
if (pipe) {
char buffer[128];
if (fgets(buffer, sizeof(buffer), pipe) != nullptr) {
pid_t real_pid = std::atoi(buffer);
pclose(pipe);
if (real_pid > 0) {
// Give the process a moment to start
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// Set the PID
mgr.set_child_pid_for_test(real_pid);
// Call stop - should try graceful shutdown first, then force kill
// Since process ignores SIGTERM, it will still be alive after 200ms
// This should trigger the force kill path (lines 105-110)
mgr.stop();
// Verify PID is reset
EXPECT_EQ(mgr.get_child_pid(), 0);
// Clean up: make sure child is dead
kill(real_pid, SIGKILL);
waitpid(real_pid, nullptr, WNOHANG);
}
} else {
pclose(pipe);
}
}
}
// Test start_cdc_client with missing jar file
TEST_F(CdcClientMgrTest, StartCdcClientMissingJar) {
[[maybe_unused]] int rm_ret = system(("rm -f " + _jar_path).c_str());
CdcClientMgr mgr;
PRequestCdcClientResult result;
Status status = mgr.start_cdc_client(&result);
EXPECT_FALSE(status.ok());
EXPECT_TRUE(status.to_string().find("cdc-client.jar") != std::string::npos);
EXPECT_TRUE(result.has_status());
[[maybe_unused]] int touch_ret = system(("touch " + _jar_path).c_str());
}
// Test start_cdc_client with missing JAVA_HOME
TEST_F(CdcClientMgrTest, StartCdcClientMissingJavaHome) {
unsetenv("JAVA_HOME");
CdcClientMgr mgr;
PRequestCdcClientResult result;
Status status = mgr.start_cdc_client(&result);
EXPECT_FALSE(status.ok());
EXPECT_TRUE(status.to_string().find("JAVA_HOME") != std::string::npos);
EXPECT_TRUE(result.has_status());
}
// Test multiple calls to start_cdc_client
TEST_F(CdcClientMgrTest, StartCdcClientMultipleTimes) {
CdcClientMgr mgr;
PRequestCdcClientResult result1;
PRequestCdcClientResult result2;
// Initially no child process
EXPECT_EQ(mgr.get_child_pid(), 0);
// First call - should start CDC client
Status status1 = mgr.start_cdc_client(&result1);
EXPECT_TRUE(status1.ok());
pid_t pid_after_first = mgr.get_child_pid();
EXPECT_GT(pid_after_first, 0); // PID should be set
// Second call - should detect already started and not restart
Status status2 = mgr.start_cdc_client(&result2);
EXPECT_TRUE(status2.ok());
EXPECT_EQ(mgr.get_child_pid(), pid_after_first); // PID should not change!
}
// Test destructor calls stop
TEST_F(CdcClientMgrTest, DestructorCallsStop) {
{
CdcClientMgr mgr;
PRequestCdcClientResult result;
[[maybe_unused]] Status status = mgr.start_cdc_client(&result);
// Destructor will be called when mgr goes out of scope
}
// If no crash, test passes
SUCCEED();
}
// Test send_request_to_cdc_client with explicitly empty params (covers params_body.empty() branch)
TEST_F(CdcClientMgrTest, SendRequestExplicitlyEmptyParams) {
CdcClientMgr mgr;
std::string response;
// Explicitly test empty params_body path (line 309)
std::string empty_params = "";
Status status = mgr.send_request_to_cdc_client("/health", empty_params, &response);
// Expected to fail but should not crash
EXPECT_FALSE(status.ok());
}
// Test request_cdc_client_impl when start fails (covers lines 288-291)
TEST_F(CdcClientMgrTest, RequestCdcClientImplStartFailed) {
CdcClientMgr mgr;
PRequestCdcClientRequest request;
PRequestCdcClientResult result;
request.set_api("/test/api");
request.set_params("{}");
// Remove JAVA_HOME to make start_cdc_client fail
unsetenv("JAVA_HOME");
struct TestClosure : public google::protobuf::Closure {
void Run() override { called = true; }
bool called = false;
};
TestClosure closure;
mgr.request_cdc_client_impl(&request, &result, &closure);
// Should have error status in result
EXPECT_TRUE(result.has_status());
EXPECT_NE(result.status().status_code(), 0);
EXPECT_TRUE(closure.called);
// Restore JAVA_HOME
if (_original_java_home) {
setenv("JAVA_HOME", _original_java_home, 1);
}
}
// Test start_cdc_client with result
TEST_F(CdcClientMgrTest, StartCdcClientWithResult) {
CdcClientMgr mgr;
PRequestCdcClientResult result;
EXPECT_EQ(mgr.get_child_pid(), 0);
Status status = mgr.start_cdc_client(&result);
// Should succeed
EXPECT_TRUE(status.ok());
EXPECT_GT(mgr.get_child_pid(), 0); // PID should be set
}
// Test start_cdc_client when environment is missing
TEST_F(CdcClientMgrTest, StartCdcClientMissingEnv) {
unsetenv("JAVA_HOME");
CdcClientMgr mgr;
PRequestCdcClientResult result;
Status status = mgr.start_cdc_client(&result);
EXPECT_FALSE(status.ok());
EXPECT_TRUE(status.to_string().find("JAVA_HOME") != std::string::npos);
EXPECT_TRUE(result.has_status());
// Restore JAVA_HOME
if (_original_java_home) {
setenv("JAVA_HOME", _original_java_home, 1);
}
}
// Test concurrent start attempts
TEST_F(CdcClientMgrTest, ConcurrentStartAttempts) {
CdcClientMgr mgr;
std::vector<std::thread> threads;
std::atomic<int> success_count {0};
// Initially no child process
EXPECT_EQ(mgr.get_child_pid(), 0);
// Launch multiple threads trying to start CDC client
for (int i = 0; i < 5; ++i) {
threads.emplace_back([&mgr, &success_count]() {
PRequestCdcClientResult result;
Status status = mgr.start_cdc_client(&result);
if (status.ok()) {
success_count++;
}
});
}
for (auto& t : threads) {
t.join();
}
// All attempts should succeed (subsequent ones detect already started)
EXPECT_EQ(success_count.load(), 5);
// CDC client should be started exactly once (PID is set)
EXPECT_GT(mgr.get_child_pid(), 0);
}
// Test send_request with various HTTP methods implicitly
TEST_F(CdcClientMgrTest, SendRequestVariousEndpoints) {
CdcClientMgr mgr;
std::string response;
// Test different API endpoints (reduced from 4 to 2 for speed)
std::vector<std::string> apis = {"/actuator/health", "/api/submit"};
for (const auto& api : apis) {
Status status = mgr.send_request_to_cdc_client(api, "{}", &response);
// Expected to fail (no real CDC client running) but should not crash
EXPECT_FALSE(status.ok());
}
}
// Test that stop is idempotent
TEST_F(CdcClientMgrTest, StopIdempotent) {
CdcClientMgr mgr;
// Multiple stops should not cause issues
mgr.stop();
mgr.stop();
mgr.stop();
// Should still be able to start after multiple stops
PRequestCdcClientResult result;
Status status = mgr.start_cdc_client(&result);
EXPECT_TRUE(status.ok());
}
// Test rapid start/stop cycles
TEST_F(CdcClientMgrTest, RapidStartStopCycles) {
CdcClientMgr mgr;
PRequestCdcClientResult result;
for (int i = 0; i < 5; ++i) {
EXPECT_EQ(mgr.get_child_pid(), 0); // Should be stopped
Status status = mgr.start_cdc_client(&result);
EXPECT_TRUE(status.ok());
EXPECT_GT(mgr.get_child_pid(), 0); // Should be started
mgr.stop();
EXPECT_EQ(mgr.get_child_pid(), 0); // Should be stopped again
}
}
// Test that multiple request_cdc_client_impl calls work
TEST_F(CdcClientMgrTest, MultipleRequestCalls) {
CdcClientMgr mgr;
// Reduced from 5 to 2 iterations for speed
for (int i = 0; i < 2; ++i) {
PRequestCdcClientRequest request;
PRequestCdcClientResult result;
struct TestClosure : public google::protobuf::Closure {
void Run() override {}
};
TestClosure closure;
request.set_api("/test");
request.set_params("{\"index\":" + std::to_string(i) + "}");
mgr.request_cdc_client_impl(&request, &result, &closure);
EXPECT_TRUE(result.has_status());
}
}
// Test behavior when DORIS_HOME has trailing slash
TEST_F(CdcClientMgrTest, DorisHomeWithTrailingSlash) {
if (_original_doris_home) {
std::string doris_home_with_slash = std::string(_original_doris_home) + "/";
setenv("DORIS_HOME", doris_home_with_slash.c_str(), 1);
CdcClientMgr mgr;
PRequestCdcClientResult result;
Status status = mgr.start_cdc_client(&result);
// Should still work fine
EXPECT_TRUE(status.ok());
// Restore original
setenv("DORIS_HOME", _original_doris_home, 1);
}
}
// Test send_request_to_cdc_client with very long API path
TEST_F(CdcClientMgrTest, SendRequestLongApiPath) {
CdcClientMgr mgr;
std::string response;
std::string long_api = "/api/v1/very/long/path/to/endpoint/with/many/segments";
Status status = mgr.send_request_to_cdc_client(long_api, "{\"data\":\"test\"}", &response);
EXPECT_FALSE(status.ok());
}
// Test concurrent stop calls
TEST_F(CdcClientMgrTest, ConcurrentStopCalls) {
CdcClientMgr mgr;
// Start CDC client first
PRequestCdcClientResult result;
Status status = mgr.start_cdc_client(&result);
EXPECT_TRUE(status.ok());
std::vector<std::thread> threads;
// Launch multiple threads trying to stop simultaneously
for (int i = 0; i < 3; ++i) {
threads.emplace_back([&mgr]() { mgr.stop(); });
}
for (auto& t : threads) {
t.join();
}
// Should be stopped cleanly
EXPECT_EQ(mgr.get_child_pid(), 0);
}
// Test concurrent requests
TEST_F(CdcClientMgrTest, ConcurrentRequests) {
CdcClientMgr mgr;
std::vector<std::thread> threads;
std::atomic<int> completed {0};
// Launch multiple threads making requests
for (int i = 0; i < 3; ++i) {
threads.emplace_back([&mgr, &completed, i]() {
PRequestCdcClientRequest request;
PRequestCdcClientResult result;
request.set_api("/test/" + std::to_string(i));
request.set_params("{\"id\":" + std::to_string(i) + "}");
struct TestClosure : public google::protobuf::Closure {
void Run() override {}
};
TestClosure closure;
mgr.request_cdc_client_impl(&request, &result, &closure);
completed++;
});
}
for (auto& t : threads) {
t.join();
}
EXPECT_EQ(completed.load(), 3);
}
// Test start after failed start attempt
TEST_F(CdcClientMgrTest, StartAfterFailedStart) {
CdcClientMgr mgr;
PRequestCdcClientResult result1;
// First attempt: remove JAVA_HOME to fail
unsetenv("JAVA_HOME");
Status status1 = mgr.start_cdc_client(&result1);
EXPECT_FALSE(status1.ok());
// Restore JAVA_HOME
if (_original_java_home) {
setenv("JAVA_HOME", _original_java_home, 1);
}
// Second attempt: should succeed now
PRequestCdcClientResult result2;
Status status2 = mgr.start_cdc_client(&result2);
EXPECT_TRUE(status2.ok());
}
// Test send_request with special characters in params
TEST_F(CdcClientMgrTest, SendRequestSpecialCharsInParams) {
CdcClientMgr mgr;
std::string response;
std::string special_params = "{\"data\":\"test\\n\\t\\r\\\"\"}";
Status status = mgr.send_request_to_cdc_client("/test", special_params, &response);
EXPECT_FALSE(status.ok());
}
// Test request_cdc_client_impl with long params
TEST_F(CdcClientMgrTest, RequestWithLongParams) {
CdcClientMgr mgr;
PRequestCdcClientRequest request;
PRequestCdcClientResult result;
request.set_api("/test");
// Create a long params string
std::string long_params = "{\"data\":\"";
for (int i = 0; i < 1000; ++i) {
long_params += "x";
}
long_params += "\"}";
request.set_params(long_params);
struct TestClosure : public google::protobuf::Closure {
void Run() override {}
};
TestClosure closure;
mgr.request_cdc_client_impl(&request, &result, &closure);
EXPECT_TRUE(result.has_status());
}
// Test multiple managers with concurrent operations
TEST_F(CdcClientMgrTest, MultipleManagersConcurrent) {
std::vector<std::unique_ptr<CdcClientMgr>> managers;
std::vector<std::thread> threads;
// Create 3 managers
for (int i = 0; i < 3; ++i) {
managers.push_back(std::make_unique<CdcClientMgr>());
}
// Each manager tries to start CDC client concurrently
for (int i = 0; i < 3; ++i) {
threads.emplace_back([&managers, i]() {
PRequestCdcClientResult result;
Status status = managers[i]->start_cdc_client(&result);
EXPECT_TRUE(status.ok());
});
}
for (auto& t : threads) {
t.join();
}
// All should have succeeded
for (auto& mgr : managers) {
EXPECT_GT(mgr->get_child_pid(), 0);
}
}
// Test start_cdc_client with result having pre-existing status
TEST_F(CdcClientMgrTest, StartWithPreExistingResultStatus) {
CdcClientMgr mgr;
PRequestCdcClientResult result;
// Pre-populate result with some status
auto* status_pb = result.mutable_status();
status_pb->set_status_code(999);
status_pb->add_error_msgs("pre-existing error");
Status status = mgr.start_cdc_client(&result);
// Should succeed
EXPECT_TRUE(status.ok());
// Note: start_cdc_client only updates result status on error, not on success
// So the pre-existing status (999) will remain unchanged
EXPECT_EQ(result.status().status_code(), 999);
}
// Test send_request_to_cdc_client with empty API
TEST_F(CdcClientMgrTest, SendRequestEmptyApi) {
CdcClientMgr mgr;
std::string response;
Status status = mgr.send_request_to_cdc_client("", "{\"test\":\"data\"}", &response);
EXPECT_FALSE(status.ok());
}
} // namespace doris