| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include <signal.h> |
| #include <stdlib.h> |
| #include <unistd.h> |
| |
| #include <sys/prctl.h> |
| #include <sys/types.h> |
| #include <sys/wait.h> |
| |
| #include <list> |
| #include <ostream> |
| #include <tuple> |
| #include <vector> |
| |
| #include <process/clock.hpp> |
| #include <process/collect.hpp> |
| #include <process/defer.hpp> |
| #include <process/io.hpp> |
| #include <process/process.hpp> |
| #include <process/subprocess.hpp> |
| |
| #include <stout/strings.hpp> |
| #include <stout/unreachable.hpp> |
| |
| #include <stout/os/signals.hpp> |
| |
| #include "common/status_utils.hpp" |
| |
| #include "linux/perf.hpp" |
| |
| using namespace process; |
| |
| using process::await; |
| |
| using std::list; |
| using std::ostringstream; |
| using std::set; |
| using std::string; |
| using std::tuple; |
| using std::vector; |
| |
| namespace perf { |
| |
| // Delimiter for fields in perf stat output. |
| static const char PERF_DELIMITER[] = ","; |
| |
| // Use an empty string as the key for the parse output when sampling a |
| // set of pids. No valid cgroup can be an empty string. |
| static const char PIDS_KEY[] = ""; |
| |
| namespace internal { |
| |
| vector<string> argv( |
| const set<string>& events, |
| const set<string>& cgroups, |
| const Duration& duration) |
| { |
| vector<string> argv = { |
| "stat", |
| |
| // System-wide collection from all CPUs. |
| "--all-cpus", |
| |
| // Print counts using a CSV-style output to make it easy to import |
| // directly into spreadsheets. Columns are separated by the string |
| // specified in PERF_DELIMITER. |
| "--field-separator", PERF_DELIMITER, |
| |
| // Ensure all output goes to stdout. |
| "--log-fd", "1" |
| }; |
| |
| // Nested loop to produce all pairings of event and cgroup. |
| foreach (const string& event, events) { |
| foreach (const string& cgroup, cgroups) { |
| argv.push_back("--event"); |
| argv.push_back(event); |
| argv.push_back("--cgroup"); |
| argv.push_back(cgroup); |
| } |
| } |
| |
| argv.push_back("--"); |
| argv.push_back("sleep"); |
| argv.push_back(stringify(duration.secs())); |
| |
| return argv; |
| } |
| |
| |
| vector<string> argv( |
| const set<string>& events, |
| const string& cgroup, |
| const Duration& duration) |
| { |
| set<string> cgroups; |
| cgroups.insert(cgroup); |
| |
| return argv(events, cgroups, duration); |
| } |
| |
| |
| vector<string> argv( |
| const set<string>& events, |
| const set<pid_t>& pids, |
| const Duration& duration) |
| { |
| vector<string> argv = { |
| "stat", |
| |
| // System-wide collection from all CPUs. |
| "--all-cpus", |
| |
| // Print counts using a CSV-style output to make it easy to import |
| // directly into spreadsheets. Columns are separated by the string |
| // specified in PERF_DELIMITER. |
| "--field-separator", PERF_DELIMITER, |
| |
| // Ensure all output goes to stdout. |
| "--log-fd", "1", |
| |
| "--event", strings::join(",", events), |
| "--pid", strings::join(",", pids), |
| "--", |
| "sleep", stringify(duration.secs()) |
| }; |
| |
| return argv; |
| } |
| |
| |
| // Normalize a perf event name. After normalization the event name |
| // should match an event field in the PerfStatistics protobuf. |
| inline string normalize(const string& s) |
| { |
| string lower = strings::lower(s); |
| return strings::replace(lower, "-", "_"); |
| } |
| |
| |
| // Executes the 'perf' command using the supplied arguments, and |
| // returns stdout as the value of the future or a failure if calling |
| // the command fails or the command returns a non-zero exit code. |
| // |
| // TODO(bmahler): Add a process::os::shell to generalize this. |
| class Perf : public Process<Perf> |
| { |
| public: |
| Perf(const vector<string>& _argv) : argv(_argv) |
| { |
| // The first argument should be 'perf'. Note that this is |
| // a bit hacky because this class is specialized to only |
| // execute the 'perf' binary. Ultimately, this should be |
| // generalized to something like process::os::shell. |
| if (argv.empty() || argv.front() != "perf") { |
| argv.insert(argv.begin(), "perf"); |
| } |
| } |
| |
| virtual ~Perf() {} |
| |
| Future<string> future() |
| { |
| return promise.future(); |
| } |
| |
| protected: |
| virtual void initialize() |
| { |
| // Stop when no one cares. |
| promise.future().onDiscard(lambda::bind( |
| static_cast<void(*)(const UPID&, bool)>(terminate), self(), true)); |
| |
| execute(); |
| } |
| |
| virtual void finalize() |
| { |
| // Kill the perf process (if it's still running) by sending |
| // SIGTERM to the signal handler which will then SIGKILL the |
| // perf process group created by setupChild. |
| if (perf.isSome() && perf.get().status().isPending()) { |
| kill(perf.get().pid(), SIGTERM); |
| } |
| |
| promise.discard(); |
| } |
| |
| private: |
| static void signalHandler(int signal) |
| { |
| // Send SIGKILL to every process in the process group of the |
| // calling process. This will terminate both the perf process |
| // (including its children) and the bookkeeping process. |
| kill(0, SIGKILL); |
| abort(); |
| } |
| |
| // This function is invoked right before each 'perf' is exec'ed. |
| // Note that this function needs to be async signal safe. In fact, |
| // all the library functions we used in this function are async |
| // signal safe. |
| static int setupChild() |
| { |
| // Send SIGTERM to the current process if the parent (i.e., the |
| // slave) exits. Note that this function should always succeed |
| // because we are passing in a valid signal. |
| prctl(PR_SET_PDEATHSIG, SIGTERM); |
| |
| // Put the current process into a separate process group so that |
| // we can kill it and all its children easily. |
| if (setpgid(0, 0) != 0) { |
| abort(); |
| } |
| |
| // Install a SIGTERM handler which will kill the current process |
| // group. Since we already setup the death signal above, the |
| // signal handler will be triggered when the parent (i.e., the |
| // slave) exits. |
| if (os::signals::install(SIGTERM, &signalHandler) != 0) { |
| abort(); |
| } |
| |
| pid_t pid = fork(); |
| if (pid == -1) { |
| abort(); |
| } else if (pid == 0) { |
| // Child. This is the process that is going to exec the perf |
| // process if zero is returned. |
| |
| // We setup death signal for the perf process as well in case |
| // someone, though unlikely, accidentally kill the parent of |
| // this process (the bookkeeping process). |
| prctl(PR_SET_PDEATHSIG, SIGKILL); |
| |
| // NOTE: We don't need to clear the signal handler explicitly |
| // because the subsequent 'exec' will clear them. |
| return 0; |
| } else { |
| // Parent. This is the bookkeeping process which will wait for |
| // the perf process to finish. |
| |
| // Close the files to prevent interference on the communication |
| // between the slave and the perf process. |
| close(STDIN_FILENO); |
| close(STDOUT_FILENO); |
| close(STDERR_FILENO); |
| |
| // Block until the perf process finishes. |
| int status = 0; |
| if (waitpid(pid, &status, 0) == -1) { |
| abort(); |
| } |
| |
| // Forward the exit status if the perf process exits normally. |
| if (WIFEXITED(status)) { |
| _exit(WEXITSTATUS(status)); |
| } |
| |
| abort(); |
| UNREACHABLE(); |
| } |
| } |
| |
| void execute() |
| { |
| Try<Subprocess> _perf = subprocess( |
| "perf", |
| argv, |
| Subprocess::PIPE(), |
| Subprocess::PIPE(), |
| Subprocess::PIPE(), |
| None(), |
| None(), |
| setupChild); |
| |
| if (_perf.isError()) { |
| promise.fail("Failed to launch perf process: " + _perf.error()); |
| terminate(self()); |
| return; |
| } |
| perf = _perf.get(); |
| |
| // Wait for the process to exit. |
| await(perf.get().status(), |
| io::read(perf.get().out().get()), |
| io::read(perf.get().err().get())) |
| .onReady(defer(self(), [this](const tuple< |
| Future<Option<int>>, |
| Future<string>, |
| Future<string>>& results) { |
| Future<Option<int>> status = std::get<0>(results); |
| Future<string> output = std::get<1>(results); |
| |
| Option<Error> error = None(); |
| |
| if (!status.isReady()) { |
| error = Error("Failed to execute perf: " + |
| (status.isFailed() ? status.failure() : "discarded")); |
| } else if (status.get().isNone()) { |
| error = Error("Failed to execute perf: failed to reap"); |
| } else if (status.get().get() != 0) { |
| error = Error("Failed to execute perf: " + |
| WSTRINGIFY(status.get().get())); |
| } else if (!output.isReady()) { |
| error = Error("Failed to read perf output: " + |
| (output.isFailed() ? output.failure() : "discarded")); |
| } |
| |
| if (error.isSome()) { |
| promise.fail(error.get().message); |
| terminate(self()); |
| return; |
| } |
| |
| promise.set(output.get()); |
| terminate(self()); |
| return; |
| })); |
| } |
| |
| vector<string> argv; |
| Promise<string> promise; |
| Option<Subprocess> perf; |
| }; |
| |
| |
| // Helper to select a single key from the hashmap of perf statistics. |
| Future<mesos::PerfStatistics> select( |
| const string& key, |
| const hashmap<string, mesos::PerfStatistics>& statistics) |
| { |
| return statistics.get(key).get(); |
| } |
| |
| |
| Future<hashmap<string, mesos::PerfStatistics>> sample( |
| const vector<string>& argv, |
| const Duration& duration) |
| { |
| Time start = Clock::now(); |
| |
| Perf* perf = new Perf(argv); |
| Future<string> future = perf->future(); |
| spawn(perf, true); |
| |
| auto parse = [start, duration](const string& output) -> |
| Future<hashmap<string, mesos::PerfStatistics>> { |
| Try<hashmap<string, mesos::PerfStatistics>> parse = perf::parse(output); |
| |
| if (parse.isError()) { |
| return Failure("Failed to parse perf sample: " + parse.error()); |
| } |
| |
| foreachvalue (mesos::PerfStatistics& statistics, parse.get()) { |
| statistics.set_timestamp(start.secs()); |
| statistics.set_duration(duration.secs()); |
| } |
| |
| return parse.get(); |
| }; |
| |
| return future.then(parse); |
| } |
| |
| } // namespace internal { |
| |
| |
| Future<mesos::PerfStatistics> sample( |
| const set<string>& events, |
| pid_t pid, |
| const Duration& duration) |
| { |
| set<pid_t> pids; |
| pids.insert(pid); |
| return sample(events, pids, duration); |
| } |
| |
| |
| Future<mesos::PerfStatistics> sample( |
| const set<string>& events, |
| const set<pid_t>& pids, |
| const Duration& duration) |
| { |
| if (!supported()) { |
| return Failure("Perf is not supported"); |
| } |
| |
| return internal::sample(internal::argv(events, pids, duration), duration) |
| .then(lambda::bind(&internal::select, PIDS_KEY, lambda::_1)); |
| } |
| |
| |
| Future<mesos::PerfStatistics> sample( |
| const set<string>& events, |
| const string& cgroup, |
| const Duration& duration) |
| { |
| set<string> cgroups; |
| cgroups.insert(cgroup); |
| return sample(events, cgroups, duration) |
| .then(lambda::bind(&internal::select, cgroup, lambda::_1)); |
| } |
| |
| |
| Future<hashmap<string, mesos::PerfStatistics>> sample( |
| const set<string>& events, |
| const set<string>& cgroups, |
| const Duration& duration) |
| { |
| if (!supported()) { |
| return Failure("Perf is not supported"); |
| } |
| |
| return internal::sample(internal::argv(events, cgroups, duration), duration); |
| } |
| |
| |
| bool valid(const set<string>& events) |
| { |
| ostringstream command; |
| |
| // Log everything to stderr which is then redirected to /dev/null. |
| command << "perf stat --log-fd 2"; |
| foreach (const string& event, events) { |
| command << " --event " << event; |
| } |
| command << " true 2>/dev/null"; |
| |
| return (os::system(command.str()) == 0); |
| } |
| |
| |
| bool supported() |
| { |
| // Require Linux kernel version >= 2.6.38 for "-x" and >= 2.6.39 for |
| // "--cgroup" |
| Try<Version> release = os::release(); |
| |
| // This is not expected to ever be an Error. |
| CHECK_SOME(release); |
| |
| return release.get() >= Version(2, 6, 39); |
| } |
| |
| |
| Try<hashmap<string, mesos::PerfStatistics>> parse(const string& output) |
| { |
| hashmap<string, mesos::PerfStatistics> statistics; |
| |
| foreach (const string& line, strings::tokenize(output, "\n")) { |
| vector<string> tokens = strings::tokenize(line, PERF_DELIMITER); |
| // Expected format for an output line is either: |
| // value,event (when sampling pids) |
| // value,event,cgroup (when sampling a cgroup) |
| // assuming PERF_DELIMITER = ",". |
| if (tokens.size() < 2 || tokens.size() > 3) { |
| return Error("Unexpected perf output at line: " + line); |
| } |
| |
| const string value = tokens[0]; |
| const string event = internal::normalize(tokens[1]); |
| // Use the special PIDS_KEY when sampling pids. |
| const string cgroup = (tokens.size() == 3 ? tokens[2] : PIDS_KEY); |
| |
| if (!statistics.contains(cgroup)) { |
| statistics.put(cgroup, mesos::PerfStatistics()); |
| } |
| |
| const google::protobuf::Reflection* reflection = |
| statistics[cgroup].GetReflection(); |
| const google::protobuf::FieldDescriptor* field = |
| statistics[cgroup].GetDescriptor()->FindFieldByName(event); |
| if (!field) { |
| return Error("Unexpected perf output at line: " + line); |
| } |
| |
| if (value == "<not supported>") { |
| LOG(WARNING) << "Unsupported perf counter, ignoring: " << line; |
| continue; |
| } |
| |
| switch (field->type()) { |
| case google::protobuf::FieldDescriptor::TYPE_DOUBLE: |
| { |
| Try<double> number = |
| (value == "<not counted>") ? 0 : numify<double>(value); |
| |
| if (number.isError()) { |
| return Error("Unable to parse perf value at line: " + line); |
| } |
| |
| reflection->SetDouble(&(statistics[cgroup]), field, number.get()); |
| break; |
| } |
| case google::protobuf::FieldDescriptor::TYPE_UINT64: |
| { |
| Try<uint64_t> number = |
| (value == "<not counted>") ? 0 : numify<uint64_t>(value); |
| |
| if (number.isError()) { |
| return Error("Unable to parse perf value at line: " + line); |
| } |
| |
| reflection->SetUInt64(&(statistics[cgroup]), field, number.get()); |
| break; |
| } |
| default: |
| return Error("Unsupported perf field type at line: " + line); |
| } |
| } |
| |
| return statistics; |
| } |
| |
| } // namespace perf { |