blob: cdc5f8314a875ababf2b17a32873775d808e1c78 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <signal.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/prctl.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <list>
#include <ostream>
#include <tuple>
#include <vector>
#include <process/clock.hpp>
#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/io.hpp>
#include <process/process.hpp>
#include <process/subprocess.hpp>
#include <stout/strings.hpp>
#include <stout/unreachable.hpp>
#include <stout/os/signals.hpp>
#include "common/status_utils.hpp"
#include "linux/perf.hpp"
using namespace process;
using process::await;
using std::list;
using std::ostringstream;
using std::set;
using std::string;
using std::tuple;
using std::vector;
namespace perf {
// Delimiter for fields in perf stat output.
static const char PERF_DELIMITER[] = ",";
// Use an empty string as the key for the parse output when sampling a
// set of pids. No valid cgroup can be an empty string.
static const char PIDS_KEY[] = "";
namespace internal {
vector<string> argv(
const set<string>& events,
const set<string>& cgroups,
const Duration& duration)
{
vector<string> argv = {
"stat",
// System-wide collection from all CPUs.
"--all-cpus",
// Print counts using a CSV-style output to make it easy to import
// directly into spreadsheets. Columns are separated by the string
// specified in PERF_DELIMITER.
"--field-separator", PERF_DELIMITER,
// Ensure all output goes to stdout.
"--log-fd", "1"
};
// Nested loop to produce all pairings of event and cgroup.
foreach (const string& event, events) {
foreach (const string& cgroup, cgroups) {
argv.push_back("--event");
argv.push_back(event);
argv.push_back("--cgroup");
argv.push_back(cgroup);
}
}
argv.push_back("--");
argv.push_back("sleep");
argv.push_back(stringify(duration.secs()));
return argv;
}
vector<string> argv(
const set<string>& events,
const string& cgroup,
const Duration& duration)
{
set<string> cgroups;
cgroups.insert(cgroup);
return argv(events, cgroups, duration);
}
vector<string> argv(
const set<string>& events,
const set<pid_t>& pids,
const Duration& duration)
{
vector<string> argv = {
"stat",
// System-wide collection from all CPUs.
"--all-cpus",
// Print counts using a CSV-style output to make it easy to import
// directly into spreadsheets. Columns are separated by the string
// specified in PERF_DELIMITER.
"--field-separator", PERF_DELIMITER,
// Ensure all output goes to stdout.
"--log-fd", "1",
"--event", strings::join(",", events),
"--pid", strings::join(",", pids),
"--",
"sleep", stringify(duration.secs())
};
return argv;
}
// Normalize a perf event name. After normalization the event name
// should match an event field in the PerfStatistics protobuf.
inline string normalize(const string& s)
{
string lower = strings::lower(s);
return strings::replace(lower, "-", "_");
}
// Executes the 'perf' command using the supplied arguments, and
// returns stdout as the value of the future or a failure if calling
// the command fails or the command returns a non-zero exit code.
//
// TODO(bmahler): Add a process::os::shell to generalize this.
class Perf : public Process<Perf>
{
public:
Perf(const vector<string>& _argv) : argv(_argv)
{
// The first argument should be 'perf'. Note that this is
// a bit hacky because this class is specialized to only
// execute the 'perf' binary. Ultimately, this should be
// generalized to something like process::os::shell.
if (argv.empty() || argv.front() != "perf") {
argv.insert(argv.begin(), "perf");
}
}
virtual ~Perf() {}
Future<string> future()
{
return promise.future();
}
protected:
virtual void initialize()
{
// Stop when no one cares.
promise.future().onDiscard(lambda::bind(
static_cast<void(*)(const UPID&, bool)>(terminate), self(), true));
execute();
}
virtual void finalize()
{
// Kill the perf process (if it's still running) by sending
// SIGTERM to the signal handler which will then SIGKILL the
// perf process group created by setupChild.
if (perf.isSome() && perf.get().status().isPending()) {
kill(perf.get().pid(), SIGTERM);
}
promise.discard();
}
private:
static void signalHandler(int signal)
{
// Send SIGKILL to every process in the process group of the
// calling process. This will terminate both the perf process
// (including its children) and the bookkeeping process.
kill(0, SIGKILL);
abort();
}
// This function is invoked right before each 'perf' is exec'ed.
// Note that this function needs to be async signal safe. In fact,
// all the library functions we used in this function are async
// signal safe.
static int setupChild()
{
// Send SIGTERM to the current process if the parent (i.e., the
// slave) exits. Note that this function should always succeed
// because we are passing in a valid signal.
prctl(PR_SET_PDEATHSIG, SIGTERM);
// Put the current process into a separate process group so that
// we can kill it and all its children easily.
if (setpgid(0, 0) != 0) {
abort();
}
// Install a SIGTERM handler which will kill the current process
// group. Since we already setup the death signal above, the
// signal handler will be triggered when the parent (i.e., the
// slave) exits.
if (os::signals::install(SIGTERM, &signalHandler) != 0) {
abort();
}
pid_t pid = fork();
if (pid == -1) {
abort();
} else if (pid == 0) {
// Child. This is the process that is going to exec the perf
// process if zero is returned.
// We setup death signal for the perf process as well in case
// someone, though unlikely, accidentally kill the parent of
// this process (the bookkeeping process).
prctl(PR_SET_PDEATHSIG, SIGKILL);
// NOTE: We don't need to clear the signal handler explicitly
// because the subsequent 'exec' will clear them.
return 0;
} else {
// Parent. This is the bookkeeping process which will wait for
// the perf process to finish.
// Close the files to prevent interference on the communication
// between the slave and the perf process.
close(STDIN_FILENO);
close(STDOUT_FILENO);
close(STDERR_FILENO);
// Block until the perf process finishes.
int status = 0;
if (waitpid(pid, &status, 0) == -1) {
abort();
}
// Forward the exit status if the perf process exits normally.
if (WIFEXITED(status)) {
_exit(WEXITSTATUS(status));
}
abort();
UNREACHABLE();
}
}
void execute()
{
Try<Subprocess> _perf = subprocess(
"perf",
argv,
Subprocess::PIPE(),
Subprocess::PIPE(),
Subprocess::PIPE(),
None(),
None(),
setupChild);
if (_perf.isError()) {
promise.fail("Failed to launch perf process: " + _perf.error());
terminate(self());
return;
}
perf = _perf.get();
// Wait for the process to exit.
await(perf.get().status(),
io::read(perf.get().out().get()),
io::read(perf.get().err().get()))
.onReady(defer(self(), [this](const tuple<
Future<Option<int>>,
Future<string>,
Future<string>>& results) {
Future<Option<int>> status = std::get<0>(results);
Future<string> output = std::get<1>(results);
Option<Error> error = None();
if (!status.isReady()) {
error = Error("Failed to execute perf: " +
(status.isFailed() ? status.failure() : "discarded"));
} else if (status.get().isNone()) {
error = Error("Failed to execute perf: failed to reap");
} else if (status.get().get() != 0) {
error = Error("Failed to execute perf: " +
WSTRINGIFY(status.get().get()));
} else if (!output.isReady()) {
error = Error("Failed to read perf output: " +
(output.isFailed() ? output.failure() : "discarded"));
}
if (error.isSome()) {
promise.fail(error.get().message);
terminate(self());
return;
}
promise.set(output.get());
terminate(self());
return;
}));
}
vector<string> argv;
Promise<string> promise;
Option<Subprocess> perf;
};
// Helper to select a single key from the hashmap of perf statistics.
Future<mesos::PerfStatistics> select(
const string& key,
const hashmap<string, mesos::PerfStatistics>& statistics)
{
return statistics.get(key).get();
}
Future<hashmap<string, mesos::PerfStatistics>> sample(
const vector<string>& argv,
const Duration& duration)
{
Time start = Clock::now();
Perf* perf = new Perf(argv);
Future<string> future = perf->future();
spawn(perf, true);
auto parse = [start, duration](const string& output) ->
Future<hashmap<string, mesos::PerfStatistics>> {
Try<hashmap<string, mesos::PerfStatistics>> parse = perf::parse(output);
if (parse.isError()) {
return Failure("Failed to parse perf sample: " + parse.error());
}
foreachvalue (mesos::PerfStatistics& statistics, parse.get()) {
statistics.set_timestamp(start.secs());
statistics.set_duration(duration.secs());
}
return parse.get();
};
return future.then(parse);
}
} // namespace internal {
Future<mesos::PerfStatistics> sample(
const set<string>& events,
pid_t pid,
const Duration& duration)
{
set<pid_t> pids;
pids.insert(pid);
return sample(events, pids, duration);
}
Future<mesos::PerfStatistics> sample(
const set<string>& events,
const set<pid_t>& pids,
const Duration& duration)
{
if (!supported()) {
return Failure("Perf is not supported");
}
return internal::sample(internal::argv(events, pids, duration), duration)
.then(lambda::bind(&internal::select, PIDS_KEY, lambda::_1));
}
Future<mesos::PerfStatistics> sample(
const set<string>& events,
const string& cgroup,
const Duration& duration)
{
set<string> cgroups;
cgroups.insert(cgroup);
return sample(events, cgroups, duration)
.then(lambda::bind(&internal::select, cgroup, lambda::_1));
}
Future<hashmap<string, mesos::PerfStatistics>> sample(
const set<string>& events,
const set<string>& cgroups,
const Duration& duration)
{
if (!supported()) {
return Failure("Perf is not supported");
}
return internal::sample(internal::argv(events, cgroups, duration), duration);
}
bool valid(const set<string>& events)
{
ostringstream command;
// Log everything to stderr which is then redirected to /dev/null.
command << "perf stat --log-fd 2";
foreach (const string& event, events) {
command << " --event " << event;
}
command << " true 2>/dev/null";
return (os::system(command.str()) == 0);
}
bool supported()
{
// Require Linux kernel version >= 2.6.38 for "-x" and >= 2.6.39 for
// "--cgroup"
Try<Version> release = os::release();
// This is not expected to ever be an Error.
CHECK_SOME(release);
return release.get() >= Version(2, 6, 39);
}
Try<hashmap<string, mesos::PerfStatistics>> parse(const string& output)
{
hashmap<string, mesos::PerfStatistics> statistics;
foreach (const string& line, strings::tokenize(output, "\n")) {
vector<string> tokens = strings::tokenize(line, PERF_DELIMITER);
// Expected format for an output line is either:
// value,event (when sampling pids)
// value,event,cgroup (when sampling a cgroup)
// assuming PERF_DELIMITER = ",".
if (tokens.size() < 2 || tokens.size() > 3) {
return Error("Unexpected perf output at line: " + line);
}
const string value = tokens[0];
const string event = internal::normalize(tokens[1]);
// Use the special PIDS_KEY when sampling pids.
const string cgroup = (tokens.size() == 3 ? tokens[2] : PIDS_KEY);
if (!statistics.contains(cgroup)) {
statistics.put(cgroup, mesos::PerfStatistics());
}
const google::protobuf::Reflection* reflection =
statistics[cgroup].GetReflection();
const google::protobuf::FieldDescriptor* field =
statistics[cgroup].GetDescriptor()->FindFieldByName(event);
if (!field) {
return Error("Unexpected perf output at line: " + line);
}
if (value == "<not supported>") {
LOG(WARNING) << "Unsupported perf counter, ignoring: " << line;
continue;
}
switch (field->type()) {
case google::protobuf::FieldDescriptor::TYPE_DOUBLE:
{
Try<double> number =
(value == "<not counted>") ? 0 : numify<double>(value);
if (number.isError()) {
return Error("Unable to parse perf value at line: " + line);
}
reflection->SetDouble(&(statistics[cgroup]), field, number.get());
break;
}
case google::protobuf::FieldDescriptor::TYPE_UINT64:
{
Try<uint64_t> number =
(value == "<not counted>") ? 0 : numify<uint64_t>(value);
if (number.isError()) {
return Error("Unable to parse perf value at line: " + line);
}
reflection->SetUInt64(&(statistics[cgroup]), field, number.get());
break;
}
default:
return Error("Unsupported perf field type at line: " + line);
}
}
return statistics;
}
} // namespace perf {