| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/util/subprocess.h" |
| |
| #include <dirent.h> |
| #include <fcntl.h> |
| #include <signal.h> |
| #if defined(__linux__) |
| #include <sys/prctl.h> |
| #endif |
| #include <sys/wait.h> |
| #include <unistd.h> |
| |
| #include <cerrno> |
| #include <cstdint> |
| #include <cstdlib> |
| #include <cstring> |
| #include <functional> |
| #include <memory> |
| #include <ostream> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include <ev++.h> |
| #include <glog/logging.h> |
| #include <glog/raw_logging.h> |
| #include <glog/stl_logging.h> |
| |
| #include "kudu/gutil/basictypes.h" |
| #include "kudu/gutil/port.h" |
| #include "kudu/gutil/strings/join.h" |
| #include "kudu/gutil/strings/numbers.h" |
| #include "kudu/gutil/strings/split.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/util/env.h" |
| #include "kudu/util/errno.h" |
| #include "kudu/util/faststring.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/signal.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/stopwatch.h" |
| |
| using std::map; |
| using std::string; |
| using std::unique_ptr; |
| using std::vector; |
| using strings::Split; |
| using strings::Substitute; |
| using strings::SubstituteAndAppend; |
| |
| namespace kudu { |
| |
| // Make glog's STL-compatible operators visible inside this namespace. |
| using ::operator<<; |
| |
| namespace { |
| |
| static double kProcessWaitTimeoutSeconds = 5.0; |
| |
| static const char* kProcSelfFd = |
| #if defined(__APPLE__) |
| "/dev/fd"; |
| #else |
| "/proc/self/fd"; |
| #endif // defined(__APPLE__) |
| |
| #if defined(__linux__) |
| #define READDIR readdir64 |
| #define DIRENT dirent64 |
| #else |
| #define READDIR readdir |
| #define DIRENT dirent |
| #endif |
| |
| // Since opendir() calls malloc(), this must be called before fork(). |
| // This function is not async-signal-safe. |
| Status OpenProcFdDir(DIR** dir) { |
| *dir = opendir(kProcSelfFd); |
| if (PREDICT_FALSE(dir == nullptr)) { |
| return Status::IOError(Substitute("opendir(\"$0\") failed", kProcSelfFd), |
| ErrnoToString(errno), errno); |
| } |
| return Status::OK(); |
| } |
| |
| // Close the directory stream opened by OpenProcFdDir(). |
| // This function is not async-signal-safe. |
| void CloseProcFdDir(DIR* dir) { |
| if (PREDICT_FALSE(closedir(dir) == -1)) { |
| LOG(WARNING) << "Unable to close fd dir: " |
| << Status::IOError(Substitute("closedir(\"$0\") failed", kProcSelfFd), |
| ErrnoToString(errno), errno).ToString(); |
| } |
| } |
| |
| // Close all open file descriptors other than stdin, stderr, stdout. |
| // Expects a directory stream created by OpenProdFdDir() as a parameter. |
| // This function is called after fork() and must not call malloc(). |
| // The rule of thumb is to only call async-signal-safe functions in such cases |
| // if at all possible. |
| void CloseNonStandardFDs(DIR* fd_dir) { |
| // This is implemented by iterating over the open file descriptors |
| // rather than using sysconf(SC_OPEN_MAX) -- the latter is error prone |
| // since it may not represent the highest open fd if the fd soft limit |
| // has changed since the process started. This should also be faster |
| // since iterating over all possible fds is likely to cause 64k+ syscalls |
| // in typical configurations. |
| // |
| // Note also that this doesn't use any of the Env utility functions, to |
| // make it as lean and mean as possible -- this runs in the subprocess |
| // after a fork, so there's some possibility that various global locks |
| // inside malloc() might be held, so allocating memory is a no-no. |
| RAW_CHECK(fd_dir != nullptr, "fd_dir is null"); |
| int dir_fd = dirfd(fd_dir); |
| |
| struct DIRENT* ent; |
| // readdir64() is not reentrant (it uses a static buffer) and it also |
| // locks fd_dir->lock, so it must not be called in a multi-threaded |
| // environment and is certainly not async-signal-safe. |
| // However, it appears to be safe to call right after fork(), since only one |
| // thread exists in the child process at that time. It also does not call |
| // malloc() or free(). We could use readdir64_r() instead, but all that |
| // buys us is reentrancy, and not async-signal-safety, due to the use of |
| // dir->lock, so seems not worth the added complexity in lifecycle & plumbing. |
| while ((ent = READDIR(fd_dir)) != nullptr) { |
| uint32_t fd; |
| if (!safe_strtou32(ent->d_name, &fd)) continue; |
| if (!(fd == STDIN_FILENO || |
| fd == STDOUT_FILENO || |
| fd == STDERR_FILENO || |
| fd == dir_fd)) { |
| int ret; |
| RETRY_ON_EINTR(ret, close(fd)); |
| } |
| } |
| } |
| |
| void RedirectToDevNull(int fd) { |
| // We must not close stderr or stdout, because then when a new file |
| // descriptor is opened, it might reuse the closed file descriptor's number |
| // (we always allocate the lowest available file descriptor number). |
| // |
| // Instead, we open /dev/null as a new file descriptor, then use dup2() to |
| // atomically close 'fd' and reuse its file descriptor number as an open file |
| // handle to /dev/null. |
| // |
| // It is expected that the file descriptor allocated when opening /dev/null |
| // will be closed when the child process closes all of its "non-standard" |
| // file descriptors later on. |
| int dev_null; |
| RETRY_ON_EINTR(dev_null, open("/dev/null", O_WRONLY)); |
| if (dev_null == -1) { |
| int err = errno; |
| RAW_LOG(WARNING, "failed to open /dev/null: [%d]", err); |
| } else { |
| int ret; |
| RETRY_ON_EINTR(ret, dup2(dev_null, fd)); |
| if (ret == -1) { |
| int err = errno; |
| RAW_LOG(FATAL, "dup2() on /dev/null failed: [%d]", err); |
| } |
| } |
| } |
| |
| // Stateful libev watcher to help ReadFdsFully(). |
| class ReadFdsFullyHelper { |
| public: |
| ReadFdsFullyHelper(string progname, ev::dynamic_loop* loop, int fd) |
| : progname_(std::move(progname)) { |
| // Bind the watcher to the provided loop, to this functor, and to the |
| // readable fd. |
| watcher_.set(*loop); |
| watcher_.set(this); |
| watcher_.set(fd, ev::READ); |
| |
| // The watcher will now be polled when its loop is run. |
| watcher_.start(); |
| } |
| |
| void operator() (ev::io &w, int revents) { |
| DCHECK_EQ(ev::READ, revents); |
| |
| char buf[1024]; |
| ssize_t n; |
| RETRY_ON_EINTR(n, read(w.fd, buf, arraysize(buf))); |
| if (n == 0) { |
| // EOF, stop watching. |
| w.stop(); |
| } else if (n < 0) { |
| // A fatal error. Store it and stop watching. |
| status_ = Status::IOError("IO error reading from " + progname_, |
| ErrnoToString(errno), errno); |
| w.stop(); |
| } else { |
| // Add our bytes and keep watching. |
| output_.append(buf, n); |
| } |
| } |
| |
| const Status& status() const { return status_; } |
| const string& output() const { return output_; } |
| |
| private: |
| const string progname_; |
| |
| ev::io watcher_; |
| string output_; |
| Status status_; |
| }; |
| |
| // Reads from all descriptors in 'fds' until EOF on all of them. If any read |
| // yields an error, it is returned. Otherwise, 'out' contains the bytes read |
| // for each fd, in the same order as was in 'fds'. |
| Status ReadFdsFully(const string& progname, |
| const vector<int>& fds, |
| vector<string>* out) { |
| ev::dynamic_loop loop; |
| |
| // Set up a watcher for each fd. |
| vector<unique_ptr<ReadFdsFullyHelper>> helpers; |
| for (int fd : fds) { |
| helpers.emplace_back(new ReadFdsFullyHelper(progname, &loop, fd)); |
| } |
| |
| // This will read until all fds return EOF. |
| loop.run(); |
| |
| // Check for failures. |
| for (const auto& h : helpers) { |
| if (!h->status().ok()) { |
| return h->status(); |
| } |
| } |
| |
| // No failures; write the output to the caller. |
| for (const auto& h : helpers) { |
| out->push_back(h->output()); |
| } |
| return Status::OK(); |
| } |
| |
| } // anonymous namespace |
| |
| Subprocess::Subprocess(vector<string> argv, int sig_on_destruct) |
| : program_(argv[0]), |
| argv_(std::move(argv)), |
| state_(kNotStarted), |
| child_pid_(-1), |
| fd_state_(), |
| child_fds_(), |
| sig_on_destruct_(sig_on_destruct) { |
| |
| fd_state_[STDIN_FILENO] = PIPED; |
| fd_state_[STDOUT_FILENO] = SHARED; |
| fd_state_[STDERR_FILENO] = SHARED; |
| child_fds_[STDIN_FILENO] = -1; |
| child_fds_[STDOUT_FILENO] = -1; |
| child_fds_[STDERR_FILENO] = -1; |
| } |
| |
| Subprocess::~Subprocess() { |
| if (state_ == kRunning) { |
| LOG(WARNING) << Substitute( |
| "Child process $0 ($1) was orphaned. Sending signal $2...", |
| child_pid_, JoinStrings(argv_, " "), sig_on_destruct_); |
| WARN_NOT_OK(KillAndWait(sig_on_destruct_), |
| Substitute("Failed to KillAndWait() with signal $0", |
| sig_on_destruct_)); |
| } |
| |
| for (int i = 0; i < 3; ++i) { |
| if (fd_state_[i] == PIPED && child_fds_[i] >= 0) { |
| int ret; |
| RETRY_ON_EINTR(ret, close(child_fds_[i])); |
| } |
| } |
| } |
| |
| #if defined(__APPLE__) |
| static int pipe2(int pipefd[2], int flags) { |
| DCHECK_EQ(O_CLOEXEC, flags); |
| |
| int new_fds[2]; |
| if (pipe(new_fds) == -1) { |
| return -1; |
| } |
| if (fcntl(new_fds[0], F_SETFD, O_CLOEXEC) == -1) { |
| int ret; |
| RETRY_ON_EINTR(ret, close(new_fds[0])); |
| RETRY_ON_EINTR(ret, close(new_fds[1])); |
| return -1; |
| } |
| if (fcntl(new_fds[1], F_SETFD, O_CLOEXEC) == -1) { |
| int ret; |
| RETRY_ON_EINTR(ret, close(new_fds[0])); |
| RETRY_ON_EINTR(ret, close(new_fds[1])); |
| return -1; |
| } |
| pipefd[0] = new_fds[0]; |
| pipefd[1] = new_fds[1]; |
| return 0; |
| } |
| #endif |
| |
| Status Subprocess::Start() { |
| VLOG(2) << "Invoking command: " << argv_; |
| if (state_ != kNotStarted) { |
| const string err_str = Substitute("$0: illegal sub-process state", state_); |
| LOG(DFATAL) << err_str; |
| return Status::IllegalState(err_str); |
| } |
| if (argv_.empty()) { |
| return Status::InvalidArgument("argv must have at least one elem"); |
| } |
| |
| // We explicitly set SIGPIPE to SIG_IGN here because we are using UNIX pipes. |
| IgnoreSigPipe(); |
| |
| vector<char*> argv_ptrs; |
| for (const string& arg : argv_) { |
| argv_ptrs.push_back(const_cast<char*>(arg.c_str())); |
| } |
| argv_ptrs.push_back(nullptr); |
| |
| // Pipe from caller process to child's stdin |
| // [0] = stdin for child, [1] = how parent writes to it |
| int child_stdin[2] = {-1, -1}; |
| if (fd_state_[STDIN_FILENO] == PIPED) { |
| PCHECK(pipe2(child_stdin, O_CLOEXEC) == 0); |
| } |
| // Pipe from child's stdout back to caller process |
| // [0] = how parent reads from child's stdout, [1] = how child writes to it |
| int child_stdout[2] = {-1, -1}; |
| if (fd_state_[STDOUT_FILENO] == PIPED) { |
| PCHECK(pipe2(child_stdout, O_CLOEXEC) == 0); |
| } |
| // Pipe from child's stderr back to caller process |
| // [0] = how parent reads from child's stderr, [1] = how child writes to it |
| int child_stderr[2] = {-1, -1}; |
| if (fd_state_[STDERR_FILENO] == PIPED) { |
| PCHECK(pipe2(child_stderr, O_CLOEXEC) == 0); |
| } |
| // The synchronization pipe: this trick is to make sure the parent returns |
| // control only after the child process has invoked execvp(). |
| int sync_pipe[2]; |
| PCHECK(pipe2(sync_pipe, O_CLOEXEC) == 0); |
| |
| DIR* fd_dir = nullptr; |
| RETURN_NOT_OK_PREPEND(OpenProcFdDir(&fd_dir), "Unable to open fd dir"); |
| unique_ptr<DIR, std::function<void(DIR*)>> fd_dir_closer(fd_dir, |
| CloseProcFdDir); |
| int ret; |
| RETRY_ON_EINTR(ret, fork()); |
| if (ret == -1) { |
| return Status::RuntimeError("Unable to fork", ErrnoToString(errno), errno); |
| } |
| if (ret == 0) { // We are the child |
| // As a general note, it's not safe to call non-async-signal-safe functions |
| // in the child process between fork() and exec(). Surprisingly, a call to |
| // LOG() locks a mutex that may have been copied from the parent's address |
| // space in an already locked state, so it is not async-signal-safe and |
| // can deadlock the child if called. So, in this vulnerable state the child |
| // outputs log messages using RAW_LOG() instead directly into stderr. |
| // RAW_LOG() uses vsnprintf() under the hood: it's not async-signal-safe |
| // strictly speaking (might call malloc() and getenv() in some cases which |
| // might acquire locks themselves), but it's much better than using LOG() |
| // where it can simply deadlock on glog's mutex. BTW, some allocators like |
| // tcmalloc install pthread_atfork() handlers, so with tcmalloc we have |
| // more safety with vsnprintf(). |
| // |
| // An alternative approach might be to use some additional functionality |
| // in glog library (once implemented) to establish thread_atfork() handlers; |
| // see https://github.com/robi56/google-glog/issues/101 for details. |
| |
| // Send the child a SIGTERM when the parent dies. This is done as early |
| // as possible in the child's life to prevent any orphaning whatsoever |
| // (e.g. from KUDU-402). |
| #if defined(__linux__) |
| // TODO: prctl(PR_SET_PDEATHSIG) is Linux-specific, look into portable ways |
| // to prevent orphans when parent is killed. |
| prctl(PR_SET_PDEATHSIG, SIGKILL); |
| #endif |
| |
| // stdin |
| if (fd_state_[STDIN_FILENO] == PIPED) { |
| int dup2_ret; |
| RETRY_ON_EINTR(dup2_ret, dup2(child_stdin[0], STDIN_FILENO)); |
| if (dup2_ret != STDIN_FILENO) { |
| int err = errno; |
| RAW_LOG(FATAL, "dup2() failed (STDIN): [%d]", err); |
| } |
| } else { |
| RAW_DCHECK(SHARED == fd_state_[STDIN_FILENO], |
| "unexpected state of STDIN"); |
| } |
| |
| // stdout |
| switch (fd_state_[STDOUT_FILENO]) { |
| case PIPED: { |
| int dup2_ret; |
| RETRY_ON_EINTR(dup2_ret, dup2(child_stdout[1], STDOUT_FILENO)); |
| if (dup2_ret != STDOUT_FILENO) { |
| int err = errno; |
| RAW_LOG(FATAL, "dup2() failed (STDOUT): [%d]", err); |
| } |
| break; |
| } |
| case DISABLED: { |
| RedirectToDevNull(STDOUT_FILENO); |
| break; |
| } |
| default: |
| RAW_DCHECK(SHARED == fd_state_[STDOUT_FILENO], |
| "unexpected state of STDOUT"); |
| break; |
| } |
| |
| // stderr |
| switch (fd_state_[STDERR_FILENO]) { |
| case PIPED: { |
| int dup2_ret; |
| RETRY_ON_EINTR(dup2_ret, dup2(child_stderr[1], STDERR_FILENO)); |
| if (dup2_ret != STDERR_FILENO) { |
| int err = errno; |
| RAW_LOG(FATAL, "dup2() failed (STDERR): [%d]", err); |
| } |
| break; |
| } |
| case DISABLED: { |
| RedirectToDevNull(STDERR_FILENO); |
| break; |
| } |
| default: |
| RAW_DCHECK(SHARED == fd_state_[STDERR_FILENO], |
| "unexpected state of STDERR"); |
| break; |
| } |
| |
| // Close the read side of the sync pipe; |
| // the write side should be closed upon execvp(). |
| int close_ret; |
| RETRY_ON_EINTR(close_ret, close(sync_pipe[0])); |
| if (close_ret == -1) { |
| int err = errno; |
| RAW_LOG(FATAL, "close() on the read side of sync pipe failed: [%d]", err); |
| } |
| |
| CloseNonStandardFDs(fd_dir); |
| |
| // Ensure we are not ignoring or blocking signals in the child process. |
| ResetAllSignalMasksToUnblocked(); |
| |
| // Reset the disposition of SIGPIPE to SIG_DFL because we routinely set its |
| // disposition to SIG_IGN via IgnoreSigPipe(). At the time of writing, we |
| // don't explicitly ignore any other signals in Kudu. |
| ResetSigPipeHandlerToDefault(); |
| |
| // Set the current working directory of the subprocess. |
| if (!cwd_.empty() && chdir(cwd_.c_str()) == -1) { |
| int err = errno; |
| RAW_LOG(FATAL, "chdir() to '%s' failed: [%d]", cwd_.c_str(), err); |
| } |
| |
| // Set the environment for the subprocess. This is more portable than |
| // using execvpe(), which doesn't exist on OS X. We rely on the 'p' |
| // variant of exec to do $PATH searching if the executable specified |
| // by the caller isn't an absolute path. |
| for (const auto& env : env_) { |
| ignore_result(setenv(env.first.c_str(), env.second.c_str(), 1 /* overwrite */)); |
| } |
| |
| execvp(program_.c_str(), &argv_ptrs[0]); |
| int err = errno; |
| RAW_LOG(ERROR, "could not exec '%s': [%d]", program_.c_str(), err); |
| _exit(err); |
| } else { |
| // We are the parent |
| child_pid_ = ret; |
| // Close child's side of the pipes |
| int close_ret; |
| if (fd_state_[STDIN_FILENO] == PIPED) RETRY_ON_EINTR(close_ret, close(child_stdin[0])); |
| if (fd_state_[STDOUT_FILENO] == PIPED) RETRY_ON_EINTR(close_ret, close(child_stdout[1])); |
| if (fd_state_[STDERR_FILENO] == PIPED) RETRY_ON_EINTR(close_ret, close(child_stderr[1])); |
| // Keep parent's side of the pipes |
| child_fds_[STDIN_FILENO] = child_stdin[1]; |
| child_fds_[STDOUT_FILENO] = child_stdout[0]; |
| child_fds_[STDERR_FILENO] = child_stderr[0]; |
| |
| // Wait for the child process to invoke execvp(). The trick involves |
| // a pipe with O_CLOEXEC option for its descriptors. The parent process |
| // performs blocking read from the pipe while the write side of the pipe |
| // is kept open by the child (it does not write any data, though). The write |
| // side of the pipe is closed when the child invokes execvp(). At that |
| // point, the parent should receive EOF, i.e. read() should return 0. |
| { |
| // Close the write side of the sync pipe. It's crucial to make sure |
| // it succeeds otherwise the blocking read() below might wait forever |
| // even if the child process has closed the pipe. |
| RETRY_ON_EINTR(close_ret, close(sync_pipe[1])); |
| PCHECK(close_ret == 0); |
| while (true) { |
| uint8_t buf; |
| int err = 0; |
| int rc; |
| RETRY_ON_EINTR(rc, read(sync_pipe[0], &buf, 1)); |
| if (rc == -1) { |
| err = errno; |
| } |
| RETRY_ON_EINTR(close_ret, close(sync_pipe[0])); |
| PCHECK(close_ret == 0); |
| if (rc == 0) { |
| // That's OK -- expecting EOF from the other side of the pipe. |
| break; |
| } else if (rc == -1) { |
| // Other errors besides EINTR are not expected. |
| return Status::RuntimeError("Unexpected error from the sync pipe", |
| ErrnoToString(err), err); |
| } |
| // No data is expected from the sync pipe. |
| LOG(FATAL) << Substitute("$0: unexpected data from the sync pipe", rc); |
| } |
| } |
| } |
| |
| state_ = kRunning; |
| return Status::OK(); |
| } |
| |
| Status Subprocess::Wait(int* wait_status) { |
| return DoWait(wait_status, BLOCKING); |
| } |
| |
| Status Subprocess::WaitNoBlock(int* wait_status) { |
| return DoWait(wait_status, NON_BLOCKING); |
| } |
| |
| Status Subprocess::WaitAndCheckExitCode() { |
| int wait_status; |
| RETURN_NOT_OK(DoWait(&wait_status, BLOCKING)); |
| int exit_status; |
| string info_str; |
| |
| RETURN_NOT_OK(GetExitStatus(&exit_status, &info_str)); |
| |
| return exit_status == 0 |
| ? Status::OK() |
| : Status::RuntimeError(Substitute("Exit code: $0 ($1)", |
| exit_status, info_str)); |
| } |
| |
| Status Subprocess::GetProcfsState(int pid, ProcfsState* state) { |
| faststring data; |
| string filename = Substitute("/proc/$0/stat", pid); |
| RETURN_NOT_OK(ReadFileToString(Env::Default(), filename, &data)); |
| |
| // The part of /proc/<pid>/stat that's relevant for us looks like this: |
| // |
| // "16009 (subprocess-test) R ..." |
| // |
| // The first number is the PID, the string in the parens in the command, and |
| // the single letter afterwards is the process' state. |
| // |
| // To extract the state, we scan backwards looking for the last ')', then |
| // increment past it and the separating space. This is safer than scanning |
| // forward as it properly handles commands containing parens. |
| string data_str = data.ToString(); |
| const char* end_parens = strrchr(data_str.c_str(), ')'); |
| if (end_parens == nullptr) { |
| return Status::RuntimeError(Substitute("unexpected layout in $0", filename)); |
| } |
| char proc_state = end_parens[2]; |
| |
| switch (proc_state) { |
| case 'T': |
| *state = ProcfsState::PAUSED; |
| break; |
| default: |
| *state = ProcfsState::RUNNING; |
| break; |
| } |
| return Status::OK(); |
| } |
| |
| Status Subprocess::Kill(int signal) { |
| if (state_ != kRunning) { |
| const string err_str = "Sub-process is not running"; |
| LOG(DFATAL) << err_str; |
| return Status::IllegalState(err_str); |
| } |
| if (kill(child_pid_, signal) != 0) { |
| return Status::RuntimeError("Unable to kill", |
| ErrnoToString(errno), |
| errno); |
| } |
| |
| // Signal delivery is often asynchronous. For some signals, we try to wait |
| // for the process to actually change state, using /proc/<pid>/stat as a |
| // guide. This is best-effort. |
| ProcfsState desired_state; |
| switch (signal) { |
| case SIGSTOP: |
| desired_state = ProcfsState::PAUSED; |
| break; |
| case SIGCONT: |
| desired_state = ProcfsState::RUNNING; |
| break; |
| default: |
| return Status::OK(); |
| } |
| Stopwatch sw; |
| sw.start(); |
| do { |
| ProcfsState current_state; |
| if (!GetProcfsState(child_pid_, ¤t_state).ok()) { |
| // There was some error parsing /proc/<pid>/stat (or perhaps it doesn't |
| // exist on this platform). |
| return Status::OK(); |
| } |
| if (current_state == desired_state) { |
| return Status::OK(); |
| } |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } while (sw.elapsed().wall_seconds() < kProcessWaitTimeoutSeconds); |
| return Status::OK(); |
| } |
| |
| Status Subprocess::KillAndWait(int signal) { |
| if (state_ != kRunning) { |
| const string err_str = "Sub-process is not running"; |
| LOG(DFATAL) << err_str; |
| return Status::IllegalState(err_str); |
| } |
| string procname = Substitute("$0 (pid $1)", argv0(), pid()); |
| |
| // This is a fatal error because all errors in Kill() are signal-independent, |
| // so Kill(SIGKILL) is just as likely to fail if this did. |
| RETURN_NOT_OK_PREPEND( |
| Kill(signal), Substitute("Failed to send signal $0 to $1", |
| signal, procname)); |
| if (signal == SIGKILL) { |
| RETURN_NOT_OK_PREPEND( |
| Wait(), Substitute("Failed to wait on $0", procname)); |
| } else { |
| Status s; |
| Stopwatch sw; |
| sw.start(); |
| do { |
| s = WaitNoBlock(); |
| if (s.ok()) { |
| break; |
| } else if (!s.IsTimedOut()) { |
| // An unexpected error in WaitNoBlock() is likely to manifest repeatedly, |
| // so there's no point in retrying this. |
| RETURN_NOT_OK_PREPEND( |
| s, Substitute("Unexpected failure while waiting on $0", procname)); |
| } |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } while (sw.elapsed().wall_seconds() < kProcessWaitTimeoutSeconds); |
| if (s.IsTimedOut()) { |
| return KillAndWait(SIGKILL); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status Subprocess::GetExitStatus(int* exit_status, string* info_str) const { |
| if (state_ != kExited) { |
| const string err_str = "Sub-process termination hasn't yet been detected"; |
| LOG(DFATAL) << err_str; |
| return Status::IllegalState(err_str); |
| } |
| string info; |
| int status; |
| if (WIFEXITED(wait_status_)) { |
| status = WEXITSTATUS(wait_status_); |
| if (status == 0) { |
| info = Substitute("$0: process successfully exited", program_); |
| } else { |
| info = Substitute("$0: process exited with non-zero status $1", |
| program_, status); |
| } |
| } else if (WIFSIGNALED(wait_status_)) { |
| // Using signal number as exit status. |
| status = WTERMSIG(wait_status_); |
| info = Substitute("$0: process exited on signal $1", program_, status); |
| #if defined(WCOREDUMP) |
| if (WCOREDUMP(wait_status_)) { |
| SubstituteAndAppend(&info, " (core dumped)"); |
| } |
| #endif |
| } else { |
| status = -1; |
| info = Substitute("$0: process reported unexpected wait status $1", |
| program_, wait_status_); |
| LOG(DFATAL) << info; |
| } |
| if (exit_status) { |
| *exit_status = status; |
| } |
| if (info_str) { |
| *info_str = info; |
| } |
| return Status::OK(); |
| } |
| |
| Status Subprocess::Call(const string& arg_str) { |
| vector<string> argv = Split(arg_str, " "); |
| return Call(argv, "", nullptr, nullptr); |
| } |
| |
| Status Subprocess::Call(const vector<string>& argv, |
| const string& stdin_in, |
| string* stdout_out, |
| string* stderr_out, |
| map<string, string> env_vars) { |
| Subprocess p(argv); |
| |
| if (stdout_out) { |
| p.ShareParentStdout(false); |
| } |
| if (stderr_out) { |
| p.ShareParentStderr(false); |
| } |
| |
| if (!env_vars.empty()) { |
| p.SetEnvVars(std::move(env_vars)); |
| } |
| |
| RETURN_NOT_OK_PREPEND(p.Start(), |
| "Unable to fork " + argv[0]); |
| |
| if (!stdin_in.empty()) { |
| ssize_t written; |
| RETRY_ON_EINTR(written, write(p.to_child_stdin_fd(), stdin_in.data(), stdin_in.size())); |
| if (written < stdin_in.size()) { |
| return Status::IOError("Unable to write to child process stdin", |
| ErrnoToString(errno), errno); |
| } |
| } |
| |
| int err; |
| RETRY_ON_EINTR(err, close(p.ReleaseChildStdinFd())); |
| if (PREDICT_FALSE(err != 0)) { |
| return Status::IOError("Unable to close child process stdin", ErrnoToString(errno), errno); |
| } |
| |
| vector<int> fds; |
| if (stdout_out) { |
| fds.push_back(p.from_child_stdout_fd()); |
| } |
| if (stderr_out) { |
| fds.push_back(p.from_child_stderr_fd()); |
| } |
| vector<string> outv; |
| RETURN_NOT_OK(ReadFdsFully(argv[0], fds, &outv)); |
| |
| // Given that ReadFdsFully captures the strings in the order in which we |
| // had installed 'fds' above, it can be assured that we can receive |
| // as many strings as there were 'fds' in the vector and in that order. |
| CHECK_EQ(outv.size(), fds.size()); |
| if (stdout_out) { |
| *stdout_out = std::move(outv.front()); |
| } |
| if (stderr_out) { |
| *stderr_out = std::move(outv.back()); |
| } |
| |
| RETURN_NOT_OK_PREPEND(p.Wait(), "Unable to wait() for " + argv[0]); |
| int exit_status; |
| string exit_info_str; |
| RETURN_NOT_OK(p.GetExitStatus(&exit_status, &exit_info_str)); |
| if (exit_status != 0) { |
| return Status::RuntimeError(exit_info_str); |
| } |
| return Status::OK(); |
| } |
| |
| pid_t Subprocess::pid() const { |
| CHECK_EQ(state_, kRunning); |
| return child_pid_; |
| } |
| |
| Status Subprocess::DoWait(int* wait_status, WaitMode mode) { |
| if (state_ == kExited) { |
| if (wait_status) { |
| *wait_status = wait_status_; |
| } |
| return Status::OK(); |
| } |
| if (state_ != kRunning) { |
| const string err_str = Substitute("$0: illegal sub-process state", state_); |
| LOG(DFATAL) << err_str; |
| return Status::IllegalState(err_str); |
| } |
| |
| const int options = (mode == NON_BLOCKING) ? WNOHANG : 0; |
| int status; |
| int rc; |
| RETRY_ON_EINTR(rc, waitpid(child_pid_, &status, options)); |
| if (rc == -1) { |
| return Status::RuntimeError("Unable to wait on child", |
| ErrnoToString(errno), errno); |
| } |
| if (mode == NON_BLOCKING && rc == 0) { |
| return Status::TimedOut(""); |
| } |
| CHECK_EQ(rc, child_pid_); |
| CHECK(WIFEXITED(status) || WIFSIGNALED(status)); |
| |
| child_pid_ = -1; |
| wait_status_ = status; |
| state_ = kExited; |
| if (wait_status) { |
| *wait_status = status; |
| } |
| return Status::OK(); |
| } |
| |
| void Subprocess::SetEnvVars(map<string, string> env) { |
| CHECK_EQ(state_, kNotStarted); |
| env_ = std::move(env); |
| } |
| |
| void Subprocess::SetCurrentDir(string cwd) { |
| CHECK_EQ(state_, kNotStarted); |
| cwd_ = std::move(cwd); |
| } |
| |
| void Subprocess::SetFdShared(int stdfd, bool share) { |
| CHECK_EQ(state_, kNotStarted); |
| fd_state_[stdfd] = share ? SHARED : PIPED; |
| } |
| |
| void Subprocess::DisableStderr() { |
| CHECK_EQ(state_, kNotStarted); |
| fd_state_[STDERR_FILENO] = DISABLED; |
| } |
| |
| void Subprocess::DisableStdout() { |
| CHECK_EQ(state_, kNotStarted); |
| fd_state_[STDOUT_FILENO] = DISABLED; |
| } |
| |
| int Subprocess::CheckAndOffer(int stdfd) const { |
| CHECK_EQ(state_, kRunning); |
| CHECK_EQ(fd_state_[stdfd], PIPED); |
| return child_fds_[stdfd]; |
| } |
| |
| int Subprocess::ReleaseChildFd(int stdfd) { |
| CHECK_EQ(state_, kRunning); |
| CHECK_GE(child_fds_[stdfd], 0); |
| CHECK_EQ(fd_state_[stdfd], PIPED); |
| int ret = child_fds_[stdfd]; |
| child_fds_[stdfd] = -1; |
| return ret; |
| } |
| |
| bool Subprocess::IsStarted() { |
| return state_ != kNotStarted; |
| } |
| |
| } // namespace kudu |