blob: af0055971e5aa8fdaf4bbda0c42767ab3ea08db0 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <memory>
#include <ostream>
#include <string>
#include <thread>
#include <utility>
#include <vector>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <google/protobuf/any.pb.h>
#include <gtest/gtest.h>
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/subprocess/echo_subprocess.h"
#include "kudu/subprocess/server.h"
#include "kudu/subprocess/subprocess.pb.h"
#include "kudu/util/env.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/path_util.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/status.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
DECLARE_int32(subprocess_request_queue_size_bytes);
DECLARE_int32(subprocess_response_queue_size_bytes);
DECLARE_int32(subprocess_num_responder_threads);
DECLARE_int32(subprocess_timeout_secs);
using google::protobuf::Any;
using std::make_shared;
using std::shared_ptr;
using std::string;
using std::thread;
using std::unique_ptr;
using std::vector;
using strings::Substitute;
namespace kudu {
namespace subprocess {
namespace {
// Creates a subprocess request with the given payload and ms to sleep.
SubprocessRequestPB CreateEchoSubprocessRequestPB(const string& payload,
int sleep_ms = 0) {
SubprocessRequestPB request;
EchoRequestPB echo_request;
echo_request.set_data(payload);
echo_request.set_sleep_ms(sleep_ms);
unique_ptr<Any> any(new Any);
any->PackFrom(echo_request);
request.set_allocated_request(any.release());
return request;
}
Status CheckMessage(const SubprocessResponsePB& resp, const string& expected_msg) {
EchoResponsePB echo_resp;
if (!resp.response().UnpackTo(&echo_resp)) {
return Status::Corruption(Substitute("Failed to unpack echo response: $0",
pb_util::SecureDebugString(resp)));
}
if (expected_msg != echo_resp.data()) {
return Status::Corruption(Substitute("Expected: '$0', got: '$1'",
expected_msg, echo_resp.data()));
}
return Status::OK();
}
const char* kHello = "hello world";
} // anonymous namespace
class SubprocessServerTest : public KuduTest {
public:
SubprocessServerTest()
: test_dir_(GetTestDataDirectory()),
metric_entity_(METRIC_ENTITY_server.Instantiate(&metric_registry_,
"subprocess_server-test")) {}
void SetUp() override {
KuduTest::SetUp();
ASSERT_OK(ResetSubprocessServer());
}
Status InitSubprocessServer(int java_queue_size,
int java_parser_threads,
shared_ptr<SubprocessServer>* out) {
// Set up a subprocess server pointing at the kudu-subprocess.jar that
// contains an echo handler and call EchoSubprocessMain.
string exe;
RETURN_NOT_OK(env_->GetExecutablePath(&exe));
const string bin_dir = DirName(exe);
string java_home;
RETURN_NOT_OK(FindHomeDir("java", bin_dir, &java_home));
const string pipe_path = SubprocessServer::FifoPath(JoinPathSegments(test_dir_, "echo_pipe"));
vector<string> argv = {
Substitute("$0/bin/java", java_home),
"-cp", Substitute("$0/kudu-subprocess.jar", bin_dir),
"org.apache.kudu.subprocess.echo.EchoSubprocessMain",
"-o", pipe_path,
};
if (java_queue_size > 0) {
argv.emplace_back("-q");
argv.emplace_back(std::to_string(java_queue_size));
}
if (java_parser_threads > 0) {
argv.emplace_back("-p");
argv.emplace_back(std::to_string(java_parser_threads));
}
*out = make_shared<SubprocessServer>(env_, pipe_path, std::move(argv),
EchoSubprocessMetrics(metric_entity_));
return (*out)->Init();
}
// Resets the subprocess server to account for any new configuration.
Status ResetSubprocessServer(int java_queue_size = 0,
int java_parser_threads = 0) {
return InitSubprocessServer(java_queue_size, java_parser_threads, &server_);
}
protected:
const string test_dir_;
MetricRegistry metric_registry_;
scoped_refptr<MetricEntity> metric_entity_;
shared_ptr<SubprocessServer> server_;
};
TEST_F(SubprocessServerTest, TestBasicCall) {
SubprocessRequestPB request = CreateEchoSubprocessRequestPB(kHello);
SubprocessResponsePB response;
ASSERT_OK(server_->Execute(&request, &response));
EchoResponsePB echo_response;
ASSERT_TRUE(response.response().UnpackTo(&echo_response));
ASSERT_EQ(echo_response.data(), kHello);
}
// Test sending many requests concurrently.
TEST_F(SubprocessServerTest, TestManyConcurrentCalls) {
constexpr int kNumThreads = 20;
constexpr int kNumPerThread = 200;
const string kEchoDataPrefix = "Do the hokey pokey, turn yourself around!";
vector<vector<SubprocessRequestPB>> requests(kNumThreads,
vector<SubprocessRequestPB>(kNumPerThread));
vector<vector<SubprocessResponsePB>> responses(kNumThreads,
vector<SubprocessResponsePB>(kNumPerThread));
for (int t = 0; t < kNumThreads; t++) {
for (int i = 0; i < kNumPerThread; i++) {
requests[t][i] = CreateEchoSubprocessRequestPB(
Substitute("$0: thread $1 idx $2", kEchoDataPrefix, t, i));
}
}
Stopwatch sw(Stopwatch::ALL_THREADS);
sw.start();
{
vector<thread> threads;
SCOPED_CLEANUP({
for (auto& t : threads) {
t.join();
}
});
for (int t = 0; t < kNumThreads; t++) {
threads.emplace_back([&, t] {
for (int i = 0; i < kNumPerThread; i++) {
ASSERT_OK(server_->Execute(&requests[t][i], &responses[t][i]));
}
});
}
}
sw.stop();
double reqs_sent = kNumThreads * kNumPerThread;
double elapsed_seconds = sw.elapsed().wall_seconds();
LOG(INFO) << Substitute("Sent $0 requests in $1 seconds: $2 req/s",
reqs_sent, elapsed_seconds, reqs_sent / elapsed_seconds);
for (int t = 0; t < kNumThreads; t++) {
for (int i = 0; i < kNumPerThread; i++) {
EchoRequestPB echo_req;
requests[t][i].request().UnpackTo(&echo_req);
ASSERT_OK(CheckMessage(responses[t][i], echo_req.data()));
}
}
}
// Test when our timeout occurs before adding the call to the outbound queue.
TEST_F(SubprocessServerTest, TestTimeoutBeforeQueueing) {
FLAGS_subprocess_timeout_secs = 0;
ASSERT_OK(ResetSubprocessServer());
SubprocessRequestPB request = CreateEchoSubprocessRequestPB(kHello);
SubprocessResponsePB response;
Status s = server_->Execute(&request, &response);
ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "couldn't enqueue call");
}
// Test when we try sending too many requests at once.
TEST_F(SubprocessServerTest, TestTimeoutWhileQueueingCalls) {
// Set a relatively low timeout so our calls timeout more easily.
FLAGS_subprocess_timeout_secs = 1;
// Set a really low queue size so we can overflow the outbound queue easily.
FLAGS_subprocess_request_queue_size_bytes = 1;
// Make the Java subprocess single-threaded so we can overwhelm it more
// easily.
ASSERT_OK(ResetSubprocessServer(/*java_queue_size*/1,
/*java_parser_threads*/1));
// Send a bunch of requests with a sleep that's lower than our timeout.
// Since we've made the subprocess single-threaded, the sheer number of
// requests should fill the pipe, and our outbound queue.
const int kNumRequests = 500;
vector<thread> threads;
vector<Status> results(kNumRequests);
const string kLargeRequest = string(10000, 'x');
for (int i = 0; i < kNumRequests; i++) {
threads.emplace_back([&, i] {
SubprocessRequestPB request =
CreateEchoSubprocessRequestPB(kLargeRequest, /*sleep_ms*/500);
SubprocessResponsePB response;
results[i] = server_->Execute(&request, &response);
});
}
for (auto& t : threads) {
t.join();
}
bool has_timeout_when_queueing = false;
for (const auto& s : results) {
if (s.IsTimedOut() &&
s.ToString().find("couldn't enqueue call") != string::npos) {
has_timeout_when_queueing = true;
}
}
// We sent a ton of requests and should've overwhelmed the pipe and our
// outbound queue.
ASSERT_TRUE(has_timeout_when_queueing) << "expected at least one timeout";
}
// Test when the subprocess takes too long.
TEST_F(SubprocessServerTest, TestSlowSubprocessTimesOut) {
FLAGS_subprocess_timeout_secs = 1;
ASSERT_OK(ResetSubprocessServer());
SubprocessRequestPB request =
CreateEchoSubprocessRequestPB(kHello, /*sleep_ms*/1500);
SubprocessResponsePB response;
Status s = server_->Execute(&request, &response);
ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "timed out while in flight");
}
// Test calls while shutting down.
TEST_F(SubprocessServerTest, TestCallsReturnWhenShuttingDown) {
Status s;
thread t([&] {
SubprocessRequestPB request =
CreateEchoSubprocessRequestPB(kHello, /*sleep_ms*/1000);
SubprocessResponsePB response;
s = server_->Execute(&request, &response);
});
server_->Shutdown();
t.join();
// There are many places the error could've happened, so we won't check on
// the exact message or error type.
ASSERT_FALSE(s.ok());
}
// Some usage of a subprocess warrants calling Init() from a short-lived
// thread. Let's ensure there's no funny business when that happens (e.g.
// ensure the OS doesn't reap the underlying process when the parent thread
// exits).
TEST_F(SubprocessServerTest, TestInitFromThread) {
Status s;
thread t([&] {
s = ResetSubprocessServer();
});
t.join();
ASSERT_OK(s);
// Wait a bit to give time for the OS to wreak havoc (though it shouldn't).
SleepFor(MonoDelta::FromSeconds(3));
SubprocessRequestPB request = CreateEchoSubprocessRequestPB(kHello);
SubprocessResponsePB response;
ASSERT_OK(server_->Execute(&request, &response));
}
// Test that we've configured out subprocess server such that we can run it
// from multiple threads without having them collide with each other.
TEST_F(SubprocessServerTest, TestRunFromMultipleThreads) {
const int kNumThreads = 3;
vector<thread> threads;
vector<Status> results(kNumThreads);
#define EXIT_NOT_OK(s, n) do { \
Status _s = (s); \
if (!_s.ok()) { \
results[n] = _s; \
return; \
} \
} while (0);
for (int i = 0; i < kNumThreads; i++) {
threads.emplace_back([&, i] {
shared_ptr<SubprocessServer> server;
EXIT_NOT_OK(InitSubprocessServer(0, 0, &server), i);
const string msg = Substitute("$0 bottles of tea on the wall", i);
SubprocessRequestPB req = CreateEchoSubprocessRequestPB(msg);
SubprocessResponsePB resp;
EXIT_NOT_OK(server->Execute(&req, &resp), i);
EXIT_NOT_OK(CheckMessage(resp, msg), i);
});
}
for (auto& t : threads) {
t.join();
}
for (const auto& r : results) {
ASSERT_OK(r);
}
}
} // namespace subprocess
} // namespace kudu