blob: 014568abfcbe23fa985746b24bf6a119a4ef12bc [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/util/subprocess.h"
#include <dirent.h>
#include <fcntl.h>
#include <glog/logging.h>
#include <memory>
#include <signal.h>
#include <string>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <vector>
#if defined(__linux__)
#include <sys/prctl.h>
#endif
#include "kudu/gutil/once.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/numbers.h"
#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/debug-util.h"
#include "kudu/util/errno.h"
#include "kudu/util/status.h"
using std::shared_ptr;
using std::string;
using std::vector;
using strings::Split;
using strings::Substitute;
namespace kudu {
namespace {
static const char* kProcSelfFd =
#if defined(__APPLE__)
"/dev/fd";
#else
"/proc/self/fd";
#endif // defined(__APPLE__)
#if defined(__linux__)
#define READDIR readdir64
#define DIRENT dirent64
#else
#define READDIR readdir
#define DIRENT dirent
#endif
void DisableSigPipe() {
struct sigaction act;
act.sa_handler = SIG_IGN;
sigemptyset(&act.sa_mask);
act.sa_flags = 0;
PCHECK(sigaction(SIGPIPE, &act, nullptr) == 0);
}
void EnsureSigPipeDisabled() {
static GoogleOnceType once = GOOGLE_ONCE_INIT;
GoogleOnceInit(&once, &DisableSigPipe);
}
// Since opendir() calls malloc(), this must be called before fork().
// This function is not async-signal-safe.
Status OpenProcFdDir(DIR** dir) {
*dir = opendir(kProcSelfFd);
if (PREDICT_FALSE(dir == nullptr)) {
return Status::IOError(Substitute("opendir(\"$0\") failed", kProcSelfFd),
ErrnoToString(errno), errno);
}
return Status::OK();
}
// Close the directory stream opened by OpenProcFdDir().
// This function is not async-signal-safe.
void CloseProcFdDir(DIR* dir) {
if (PREDICT_FALSE(closedir(dir) == -1)) {
LOG(WARNING) << "Unable to close fd dir: "
<< Status::IOError(Substitute("closedir(\"$0\") failed", kProcSelfFd),
ErrnoToString(errno), errno).ToString();
}
}
// Close all open file descriptors other than stdin, stderr, stdout.
// Expects a directory stream created by OpenProdFdDir() as a parameter.
// This function is called after fork() and must not call malloc().
// The rule of thumb is to only call async-signal-safe functions in such cases
// if at all possible.
void CloseNonStandardFDs(DIR* fd_dir) {
// This is implemented by iterating over the open file descriptors
// rather than using sysconf(SC_OPEN_MAX) -- the latter is error prone
// since it may not represent the highest open fd if the fd soft limit
// has changed since the process started. This should also be faster
// since iterating over all possible fds is likely to cause 64k+ syscalls
// in typical configurations.
//
// Note also that this doesn't use any of the Env utility functions, to
// make it as lean and mean as possible -- this runs in the subprocess
// after a fork, so there's some possibility that various global locks
// inside malloc() might be held, so allocating memory is a no-no.
PCHECK(fd_dir != nullptr);
int dir_fd = dirfd(fd_dir);
struct DIRENT* ent;
// readdir64() is not reentrant (it uses a static buffer) and it also
// locks fd_dir->lock, so it must not be called in a multi-threaded
// environment and is certainly not async-signal-safe.
// However, it appears to be safe to call right after fork(), since only one
// thread exists in the child process at that time. It also does not call
// malloc() or free(). We could use readdir64_r() instead, but all that
// buys us is reentrancy, and not async-signal-safety, due to the use of
// dir->lock, so seems not worth the added complexity in lifecycle & plumbing.
while ((ent = READDIR(fd_dir)) != nullptr) {
uint32_t fd;
if (!safe_strtou32(ent->d_name, &fd)) continue;
if (!(fd == STDIN_FILENO ||
fd == STDOUT_FILENO ||
fd == STDERR_FILENO ||
fd == dir_fd)) {
close(fd);
}
}
}
} // anonymous namespace
Subprocess::Subprocess(string program, vector<string> argv)
: program_(std::move(program)),
argv_(std::move(argv)),
state_(kNotStarted),
child_pid_(-1),
fd_state_(),
child_fds_() {
fd_state_[STDIN_FILENO] = PIPED;
fd_state_[STDOUT_FILENO] = SHARED;
fd_state_[STDERR_FILENO] = SHARED;
child_fds_[STDIN_FILENO] = -1;
child_fds_[STDOUT_FILENO] = -1;
child_fds_[STDERR_FILENO] = -1;
}
Subprocess::~Subprocess() {
if (state_ == kRunning) {
LOG(WARNING) << "Child process " << child_pid_
<< "(" << JoinStrings(argv_, " ") << ") "
<< " was orphaned. Sending SIGKILL...";
WARN_NOT_OK(Kill(SIGKILL), "Failed to send SIGKILL");
int junk = 0;
WARN_NOT_OK(Wait(&junk), "Failed to Wait()");
}
for (int i = 0; i < 3; ++i) {
if (fd_state_[i] == PIPED && child_fds_[i] >= 0) {
close(child_fds_[i]);
}
}
}
void Subprocess::SetFdShared(int stdfd, bool share) {
CHECK_EQ(state_, kNotStarted);
CHECK_NE(fd_state_[stdfd], DISABLED);
fd_state_[stdfd] = share? SHARED : PIPED;
}
void Subprocess::DisableStderr() {
CHECK_EQ(state_, kNotStarted);
fd_state_[STDERR_FILENO] = DISABLED;
}
void Subprocess::DisableStdout() {
CHECK_EQ(state_, kNotStarted);
fd_state_[STDOUT_FILENO] = DISABLED;
}
static void RedirectToDevNull(int fd) {
// We must not close stderr or stdout, because then when a new file descriptor
// gets opened, it might get that fd number. (We always allocate the lowest
// available file descriptor number.) Instead, we reopen that fd as
// /dev/null.
int dev_null = open("/dev/null", O_WRONLY);
if (dev_null < 0) {
PLOG(WARNING) << "failed to open /dev/null";
} else {
PCHECK(dup2(dev_null, fd));
}
}
#if defined(__APPLE__)
static int pipe2(int pipefd[2], int flags) {
DCHECK_EQ(O_CLOEXEC, flags);
int new_fds[2];
if (pipe(new_fds) == -1) {
return -1;
}
if (fcntl(new_fds[0], F_SETFD, O_CLOEXEC) == -1) {
close(new_fds[0]);
close(new_fds[1]);
return -1;
}
if (fcntl(new_fds[1], F_SETFD, O_CLOEXEC) == -1) {
close(new_fds[0]);
close(new_fds[1]);
return -1;
}
pipefd[0] = new_fds[0];
pipefd[1] = new_fds[1];
return 0;
}
#endif
Status Subprocess::Start() {
CHECK_EQ(state_, kNotStarted);
EnsureSigPipeDisabled();
if (argv_.size() < 1) {
return Status::InvalidArgument("argv must have at least one elem");
}
vector<char*> argv_ptrs;
for (const string& arg : argv_) {
argv_ptrs.push_back(const_cast<char*>(arg.c_str()));
}
argv_ptrs.push_back(nullptr);
// Pipe from caller process to child's stdin
// [0] = stdin for child, [1] = how parent writes to it
int child_stdin[2] = {-1, -1};
if (fd_state_[STDIN_FILENO] == PIPED) {
PCHECK(pipe2(child_stdin, O_CLOEXEC) == 0);
}
// Pipe from child's stdout back to caller process
// [0] = how parent reads from child's stdout, [1] = how child writes to it
int child_stdout[2] = {-1, -1};
if (fd_state_[STDOUT_FILENO] == PIPED) {
PCHECK(pipe2(child_stdout, O_CLOEXEC) == 0);
}
// Pipe from child's stderr back to caller process
// [0] = how parent reads from child's stderr, [1] = how child writes to it
int child_stderr[2] = {-1, -1};
if (fd_state_[STDERR_FILENO] == PIPED) {
PCHECK(pipe2(child_stderr, O_CLOEXEC) == 0);
}
DIR* fd_dir = nullptr;
RETURN_NOT_OK_PREPEND(OpenProcFdDir(&fd_dir), "Unable to open fd dir");
shared_ptr<DIR> fd_dir_closer(fd_dir, CloseProcFdDir);
int ret = fork();
if (ret == -1) {
return Status::RuntimeError("Unable to fork", ErrnoToString(errno), errno);
}
if (ret == 0) { // We are the child
// Send the child a SIGTERM when the parent dies. This is done as early
// as possible in the child's life to prevent any orphaning whatsoever
// (e.g. from KUDU-402).
#if defined(__linux__)
// TODO: prctl(PR_SET_PDEATHSIG) is Linux-specific, look into portable ways
// to prevent orphans when parent is killed.
prctl(PR_SET_PDEATHSIG, SIGTERM);
#endif
// stdin
if (fd_state_[STDIN_FILENO] == PIPED) {
PCHECK(dup2(child_stdin[0], STDIN_FILENO) == STDIN_FILENO);
}
// stdout
switch (fd_state_[STDOUT_FILENO]) {
case PIPED: {
PCHECK(dup2(child_stdout[1], STDOUT_FILENO) == STDOUT_FILENO);
break;
}
case DISABLED: {
RedirectToDevNull(STDOUT_FILENO);
break;
}
default: break;
}
// stderr
switch (fd_state_[STDERR_FILENO]) {
case PIPED: {
PCHECK(dup2(child_stderr[1], STDERR_FILENO) == STDERR_FILENO);
break;
}
case DISABLED: {
RedirectToDevNull(STDERR_FILENO);
break;
}
default: break;
}
CloseNonStandardFDs(fd_dir);
execvp(program_.c_str(), &argv_ptrs[0]);
PLOG(WARNING) << "Couldn't exec " << program_;
_exit(errno);
} else {
// We are the parent
child_pid_ = ret;
// Close child's side of the pipes
if (fd_state_[STDIN_FILENO] == PIPED) close(child_stdin[0]);
if (fd_state_[STDOUT_FILENO] == PIPED) close(child_stdout[1]);
if (fd_state_[STDERR_FILENO] == PIPED) close(child_stderr[1]);
// Keep parent's side of the pipes
child_fds_[STDIN_FILENO] = child_stdin[1];
child_fds_[STDOUT_FILENO] = child_stdout[0];
child_fds_[STDERR_FILENO] = child_stderr[0];
}
state_ = kRunning;
return Status::OK();
}
Status Subprocess::DoWait(int* ret, int options) {
if (state_ == kExited) {
*ret = cached_rc_;
return Status::OK();
}
CHECK_EQ(state_, kRunning);
int rc = waitpid(child_pid_, ret, options);
if (rc == -1) {
return Status::RuntimeError("Unable to wait on child",
ErrnoToString(errno),
errno);
}
if ((options & WNOHANG) && rc == 0) {
return Status::TimedOut("");
}
CHECK_EQ(rc, child_pid_);
child_pid_ = -1;
cached_rc_ = *ret;
state_ = kExited;
return Status::OK();
}
Status Subprocess::Kill(int signal) {
CHECK_EQ(state_, kRunning);
if (kill(child_pid_, signal) != 0) {
return Status::RuntimeError("Unable to kill",
ErrnoToString(errno),
errno);
}
return Status::OK();
}
Status Subprocess::Call(const string& arg_str) {
VLOG(2) << "Invoking command: " << arg_str;
vector<string> argv = Split(arg_str, " ");
return Call(argv);
}
Status Subprocess::Call(const vector<string>& argv) {
Subprocess proc(argv[0], argv);
RETURN_NOT_OK(proc.Start());
int retcode;
RETURN_NOT_OK(proc.Wait(&retcode));
if (retcode == 0) {
return Status::OK();
} else {
return Status::RuntimeError(Substitute(
"Subprocess '$0' terminated with non-zero exit status $1",
argv[0],
retcode));
}
}
Status Subprocess::Call(const vector<string>& argv, string* stdout_out) {
Subprocess p(argv[0], argv);
p.ShareParentStdout(false);
RETURN_NOT_OK_PREPEND(p.Start(), "Unable to fork " + argv[0]);
int err = close(p.ReleaseChildStdinFd());
if (PREDICT_FALSE(err != 0)) {
return Status::IOError("Unable to close child process stdin", ErrnoToString(errno), errno);
}
stdout_out->clear();
char buf[1024];
while (true) {
ssize_t n = read(p.from_child_stdout_fd(), buf, arraysize(buf));
if (n == 0) {
// EOF
break;
}
if (n < 0) {
if (errno == EINTR) continue;
return Status::IOError("IO error reading from " + argv[0], ErrnoToString(errno), errno);
}
stdout_out->append(buf, n);
}
int retcode;
RETURN_NOT_OK_PREPEND(p.Wait(&retcode), "Unable to wait() for " + argv[0]);
if (PREDICT_FALSE(retcode != 0)) {
return Status::RuntimeError(Substitute(
"Subprocess '$0' terminated with non-zero exit status $1",
argv[0],
retcode));
}
return Status::OK();
}
int Subprocess::CheckAndOffer(int stdfd) const {
CHECK_EQ(state_, kRunning);
CHECK_EQ(fd_state_[stdfd], PIPED);
return child_fds_[stdfd];
}
int Subprocess::ReleaseChildFd(int stdfd) {
CHECK_EQ(state_, kRunning);
CHECK_GE(child_fds_[stdfd], 0);
CHECK_EQ(fd_state_[stdfd], PIPED);
int ret = child_fds_[stdfd];
child_fds_[stdfd] = -1;
return ret;
}
pid_t Subprocess::pid() const {
CHECK_EQ(state_, kRunning);
return child_pid_;
}
} // namespace kudu