blob: ca4111f780e0f8bbd52e785c927386ab5df77366 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/subprocess/server.h"
#include <unistd.h>
#include <csignal>
#include <memory>
#include <ostream>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>
#include <gflags/gflags.h>
#include "kudu/gutil/macros.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/subprocess/subprocess.pb.h"
#include "kudu/subprocess/subprocess_protocol.h"
#include "kudu/util/async_util.h"
#include "kudu/util/env.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/status.h"
#include "kudu/util/subprocess.h"
#include "kudu/util/thread.h"
DEFINE_int32(subprocess_request_queue_size_bytes, 4 * 1024 * 1024,
"Maximum size in bytes of the outbound request queue. This is best "
"effort: if a single request is larger than this, it is still "
"added to the queue");
TAG_FLAG(subprocess_request_queue_size_bytes, advanced);
DEFINE_int32(subprocess_response_queue_size_bytes, 4 * 1024 * 1024,
"Maximum size in bytes of the inbound response queue. This is best "
"effort: if a single request is larger than this, it is still "
"added to the queue");
TAG_FLAG(subprocess_response_queue_size_bytes, advanced);
DEFINE_int32(subprocess_num_responder_threads, 3,
"Number of threads that will be dedicated to reading responses "
"from the inbound queue and returning to callers");
TAG_FLAG(subprocess_num_responder_threads, advanced);
DEFINE_int32(subprocess_timeout_secs, 15,
"Number of seconds a call to the subprocess is allowed to "
"take before a timeout error is returned to the calling process");
TAG_FLAG(subprocess_timeout_secs, advanced);
DEFINE_int32(subprocess_queue_full_retry_ms, 50,
"Number of milliseconds between attempts to enqueue the "
"request to the subprocess");
TAG_FLAG(subprocess_queue_full_retry_ms, runtime);
TAG_FLAG(subprocess_queue_full_retry_ms, advanced);
DEFINE_int32(subprocess_deadline_checking_interval_ms, 50,
"Interval in milliseconds at which Kudu will check the deadlines "
"of in-flight calls to the subprocess");
TAG_FLAG(subprocess_deadline_checking_interval_ms, runtime);
TAG_FLAG(subprocess_deadline_checking_interval_ms, advanced);
using std::make_shared;
using std::pair;
using std::shared_ptr;
using std::string;
using std::vector;
using strings::Substitute;
namespace kudu {
namespace subprocess {
string SubprocessServer::FifoPath(const string& base) {
return Substitute("$0.$1.$2", base, getpid(), Thread::CurrentThreadId());
}
SubprocessServer::SubprocessServer(Env* env, const string& receiver_file,
vector<string> subprocess_argv,
SubprocessMetrics metrics)
: call_timeout_(MonoDelta::FromSeconds(FLAGS_subprocess_timeout_secs)),
next_id_(1),
closing_(1),
env_(env),
receiver_file_(receiver_file),
process_(make_shared<Subprocess>(std::move(subprocess_argv))),
outbound_call_queue_(FLAGS_subprocess_request_queue_size_bytes),
inbound_response_queue_(FLAGS_subprocess_response_queue_size_bytes),
metrics_(std::move(metrics)) {
process_->ShareParentStdin(false);
process_->ShareParentStdout(true);
process_->ShareParentStderr(true);
}
SubprocessServer::~SubprocessServer() {
Shutdown();
}
void SubprocessServer::StartSubprocessThread(const StatusCallback& cb) {
Status s = process_->Start();
cb(s);
if (PREDICT_TRUE(s.ok())) {
// If we successfully started the process, we should stay alive until we
// shut down.
closing_.Wait();
}
}
Status SubprocessServer::Init() {
VLOG(2) << "Starting the subprocess";
Synchronizer sync;
auto cb = sync.AsStatusCallback();
RETURN_NOT_OK(Thread::Create("subprocess", "start",
[this, &cb]() { this->StartSubprocessThread(cb); },
&read_thread_));
RETURN_NOT_OK_PREPEND(sync.Wait(), "Failed to start subprocess");
// NOTE: callers should try to ensure each receiver file path is used by a
// single subprocess.
if (env_->FileExists(receiver_file_)) {
RETURN_NOT_OK(env_->DeleteFile(receiver_file_));
}
// Open the file we'll use for receiving messages.
RETURN_NOT_OK(env_->NewFifo(receiver_file_, &receiver_fifo_));
RETURN_NOT_OK(receiver_fifo_->OpenForReads());
// Start the message protocol.
CHECK(!message_protocol_);
message_protocol_.reset(new SubprocessProtocol(SubprocessProtocol::SerializationMode::PB,
SubprocessProtocol::CloseMode::CLOSE_ON_DESTROY,
receiver_fifo_->read_fd(),
process_->ReleaseChildStdinFd()));
const int num_threads = FLAGS_subprocess_num_responder_threads;
responder_threads_.resize(num_threads);
for (int i = 0; i < num_threads; i++) {
RETURN_NOT_OK(Thread::Create("subprocess", "responder",
[this]() { this->ResponderThread(); },
&responder_threads_[i]));
}
RETURN_NOT_OK(Thread::Create("subprocess", "reader",
[this]() { this->ReceiveMessagesThread(); },
&read_thread_));
RETURN_NOT_OK(Thread::Create("subprocess", "writer",
[this]() { this->SendMessagesThread(); },
&write_thread_));
return Thread::Create("subprocess", "deadline-checker",
[this]() { this->CheckDeadlinesThread(); },
&deadline_checker_);
}
Status SubprocessServer::Execute(SubprocessRequestPB* req,
SubprocessResponsePB* resp) {
DCHECK(!req->has_id());
req->set_id(next_id_++);
Synchronizer sync;
auto cb = sync.AsStatusCallback();
// Before adding to the queue, record the size of the call queue.
metrics_.server_outbound_queue_size_bytes->Increment(outbound_call_queue_.size());
CallAndTimer call_and_timer = {
make_shared<SubprocessCall>(req, resp, &cb, MonoTime::Now() + call_timeout_), {} };
RETURN_NOT_OK_PREPEND(
outbound_call_queue_.BlockingPut(std::move(call_and_timer), call_and_timer.first->deadline()),
"couldn't enqueue call");
return sync.Wait();
}
void SubprocessServer::Shutdown() {
// Stop further work from happening by killing the subprocess and shutting
// down the queues.
if (!closing_.CountDown()) {
// We may shut down out-of-band in tests; if we've already shut down,
// there's nothing left to do.
return;
}
// NOTE: ordering isn't too important as long as we shut everything down.
//
// Normally the process_ should be started before we reach Shutdown() and the
// threads below should be running too, except in mock servers because we
// don't init there. Shutdown() is still called in this case from the
// destructor though so these checks are necessary.
if (process_->IsStarted()) {
WARN_NOT_OK(process_->KillAndWait(SIGTERM), "failed to stop subprocess");
}
inbound_response_queue_.Shutdown();
outbound_call_queue_.Shutdown();
// We should be able to clean up our threads; they'll see that we're closing,
// the pipe has been closed, or the queues have been shut down.
if (write_thread_) {
write_thread_->Join();
}
if (read_thread_) {
read_thread_->Join();
}
if (deadline_checker_) {
deadline_checker_->Join();
}
if (start_thread_) {
start_thread_->Join();
}
for (const auto& t : responder_threads_) {
t->Join();
}
// Delete the receiver fifo.
receiver_fifo_.reset();
if (env_->FileExists(receiver_file_)) {
WARN_NOT_OK(env_->DeleteFile(receiver_file_), "Error deleting receiver file");
}
// Call any of the remaining callbacks.
std::map<CallId, shared_ptr<SubprocessCall>> calls;
{
std::lock_guard<simple_spinlock> l(in_flight_lock_);
calls = std::move(call_by_id_);
}
for (const auto& id_and_call : calls) {
const auto& call = id_and_call.second;
call->RespondError(Status::ServiceUnavailable("subprocess is shutting down"));
}
}
void SubprocessServer::ReceiveMessagesThread() {
DCHECK(message_protocol_) << "message protocol is not initialized";
while (closing_.count() > 0) {
// Receive a new response from the subprocess.
SubprocessResponsePB response;
Status s = message_protocol_->ReceiveMessage(&response);
if (s.IsEndOfFile()) {
// The underlying pipe was closed. We're likely shutting down.
LOG(INFO) << "Received an EOF from the subprocess";
return;
}
// TODO(awong): getting an error here indicates that this server and the
// underlying subprocess are not in sync (e.g. not speaking the same
// protocol). We should consider either crashing here, or restarting the
// subprocess.
DCHECK(s.ok());
WARN_NOT_OK(s, "failed to receive response from the subprocess");
// Before adding to the queue, record the size of the response queue.
metrics_.server_inbound_queue_size_bytes->Increment(inbound_response_queue_.size());
ResponsePBAndTimer resp_and_timer = { std::move(response), {} };
if (s.ok() && !inbound_response_queue_.BlockingPut(std::move(resp_and_timer)).ok()) {
// The queue has been shut down and we should shut down too.
DCHECK_EQ(0, closing_.count());
LOG(INFO) << "failed to put response onto inbound queue";
return;
}
}
}
void SubprocessServer::ResponderThread() {
Status s;
do {
vector<ResponsePBAndTimer> resps;
// NOTE: since we don't supply a deadline, this will only fail if the queue
// is shutting down. Also note that even if this fails because we're
// shutting down, we still populate 'resps' and must run their callbacks.
s = inbound_response_queue_.BlockingDrainTo(&resps);
for (auto& resp_and_timer : resps) {
metrics_.server_inbound_queue_time_ms->Increment(
resp_and_timer.second.elapsed().ToMilliseconds());
const auto& resp = resp_and_timer.first;
if (!resp.has_id()) {
LOG(FATAL) << Substitute("Received invalid response: $0",
pb_util::SecureDebugString(resp));
}
// Regardless of whether this call succeeded or not, parse the returned
// metrics.
if (PREDICT_TRUE(resp.has_metrics())) {
const auto& pb = resp.metrics();
metrics_.sp_inbound_queue_length->Increment(pb.inbound_queue_length());
metrics_.sp_outbound_queue_length->Increment(pb.outbound_queue_length());
metrics_.sp_inbound_queue_time_ms->Increment(pb.inbound_queue_time_ms());
metrics_.sp_outbound_queue_time_ms->Increment(pb.outbound_queue_time_ms());
metrics_.sp_execution_time_ms->Increment(pb.execution_time_ms());
}
}
vector<pair<shared_ptr<SubprocessCall>, SubprocessResponsePB>> calls_and_resps;
calls_and_resps.reserve(resps.size());
{
std::lock_guard<simple_spinlock> l(in_flight_lock_);
for (auto& resp_and_timer : resps) {
auto& resp = resp_and_timer.first;
auto id = resp.id();
auto call = EraseKeyReturnValuePtr(&call_by_id_, id);
if (call) {
calls_and_resps.emplace_back(std::move(call), std::move(resp));
}
}
}
for (auto& call_and_resp : calls_and_resps) {
call_and_resp.first->RespondSuccess(std::move(call_and_resp.second));
}
// If we didn't find our call, it timed out and the its callback has
// already been called by the deadline checker.
} while (s.ok());
DCHECK(s.IsAborted());
DCHECK_EQ(0, closing_.count());
LOG(INFO) << "get failed, inbound queue shut down: " << s.ToString();
}
void SubprocessServer::CheckDeadlinesThread() {
while (!closing_.WaitFor(
MonoDelta::FromMilliseconds(FLAGS_subprocess_deadline_checking_interval_ms))) {
MonoTime now = MonoTime::Now();
vector<shared_ptr<SubprocessCall>> timed_out_calls;
{
std::lock_guard<simple_spinlock> l(in_flight_lock_);
// NOTE: this is an approximation for age based on ID. That's OK because
// deadline-checking is best-effort.
auto earliest_call_within_deadline = call_by_id_.begin();
for (; earliest_call_within_deadline != call_by_id_.end();
earliest_call_within_deadline++) {
const auto& call = earliest_call_within_deadline->second;
if (now > call->deadline()) {
timed_out_calls.emplace_back(call);
} else {
// This is the call with the earliest deadline that hasn't passed.
break;
}
}
// All calls older than the earliest call whose deadline hasn't passed
// are timed out.
call_by_id_.erase(call_by_id_.begin(), earliest_call_within_deadline);
}
for (const auto& call : timed_out_calls) {
call->RespondError(Status::TimedOut("timed out while in flight"));
}
}
}
void SubprocessServer::SendMessagesThread() {
DCHECK(message_protocol_) << "message protocol is not initialized";
Status s;
do {
vector<CallAndTimer> calls;
// NOTE: since we don't supply a deadline, this will only fail if the queue
// is shutting down. Also note that even if this fails because we're
// shutting down, we still populate 'calls' and should add them to the
// in-flight map. We'll run their callbacks as a part of shutdown.
s = outbound_call_queue_.BlockingDrainTo(&calls);
{
std::lock_guard<simple_spinlock> l(in_flight_lock_);
for (const auto& call_and_timer : calls) {
const auto& call = call_and_timer.first;
EmplaceOrDie(&call_by_id_, call->id(), call);
}
}
// NOTE: it's possible that before sending the request, the call already
// timed out and the deadline checker already called its callback. If so,
// the following call will no-op.
for (const auto& call_and_timer : calls) {
const auto& call = call_and_timer.first;
metrics_.server_outbound_queue_time_ms->Increment(
call_and_timer.second.elapsed().ToMilliseconds());
call->SendRequest(message_protocol_.get());
}
} while (s.ok());
DCHECK(s.IsAborted());
DCHECK_EQ(0, closing_.count());
LOG(INFO) << "outbound queue shut down: " << s.ToString();
}
} // namespace subprocess
} // namespace kudu