blob: 380b7a74f6894fde6af5548c777aec02017e4b65 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <iostream>
#include <string>
#include <mesos/scheduler.hpp>
#include <mesos/authorizer/acls.hpp>
#include <process/defer.hpp>
#include <process/process.hpp>
#include <process/timeout.hpp>
#include <stout/numify.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/protobuf.hpp>
#include <stout/stopwatch.hpp>
#include <stout/strings.hpp>
#include "examples/flags.hpp"
#include "logging/logging.hpp"
using namespace mesos;
using namespace process;
using std::string;
using std::vector;
constexpr char FRAMEWORK_NAME[] = "Load Generator Framework (C++)";
// Generate load towards the master (by repeatedly sending
// ReconcileTasksMessages) at the specified rate and for the
// specified duration.
class LoadGeneratorProcess : public Process<LoadGeneratorProcess>
{
public:
LoadGeneratorProcess(
SchedulerDriver* _driver,
double _qps,
const Option<Duration>& _duration)
: driver(_driver), qps(_qps), duration(_duration), messages(0) {}
void initialize() override
{
dispatch(self(), &Self::generate);
}
private:
void generate()
{
watch.start();
while (true) {
Duration elapsed = watch.elapsed();
if (duration.isSome() && elapsed >= duration.get()) {
LOG(INFO) << "LoadGenerator generated " << messages
<< " messages in " << elapsed << " (throughput = "
<< (messages / elapsed.secs()) << " messages/sec)";
LOG(INFO) << "Stopping LoadGenerator and scheduler driver";
terminate(self());
driver->stop();
return;
}
Stopwatch reconcile;
reconcile.start();
driver->reconcileTasks(vector<TaskStatus>());
messages++;
// Compensate for the driver call overhead.
os::sleep(std::max(
Duration::zero(),
Seconds(1) / qps - reconcile.elapsed()));
}
}
SchedulerDriver* driver;
double qps;
const Option<Duration> duration;
int messages;
Stopwatch watch;
};
class LoadGenerator
{
public:
LoadGenerator(
SchedulerDriver* driver,
double qps,
const Option<Duration>& duration)
{
process = new LoadGeneratorProcess(driver, qps, duration);
spawn(process);
}
~LoadGenerator()
{
// Could be already terminated.
terminate(process);
wait(process);
delete process;
}
private:
LoadGeneratorProcess* process;
};
// This scheduler does one thing: generating network traffic towards
// the master.
class LoadGeneratorScheduler : public Scheduler
{
public:
LoadGeneratorScheduler(double _qps, const Option<Duration>& _duration)
: generator(nullptr), qps(_qps), duration(_duration) {}
~LoadGeneratorScheduler() override
{
delete generator;
}
void registered(SchedulerDriver* driver,
const FrameworkID&,
const MasterInfo& masterInfo) override
{
LOG(INFO) << "Registered with " << masterInfo.pid();
if (generator == nullptr) {
LOG(INFO) << "Starting LoadGenerator at QPS: " << qps;
generator = new LoadGenerator(driver, qps, duration);
}
}
void reregistered(
SchedulerDriver* driver,
const MasterInfo& masterInfo) override
{
LOG(INFO) << "Reregistered with " << masterInfo.pid();
if (generator == nullptr) {
LOG(INFO) << "Starting LoadGenerator at QPS: " << qps;
generator = new LoadGenerator(driver, qps, duration);
}
}
void disconnected(SchedulerDriver* driver) override
{
LOG(INFO) << "Disconnected!";
delete generator;
generator = nullptr;
LOG(INFO) << "Stopped LoadGenerator";
}
void resourceOffers(
SchedulerDriver* driver,
const vector<Offer>& offers) override
{
LOG(INFO) << "Received " << offers.size()
<< " resource offers. Declining them";
Filters filters;
// Refuse for eternity so Master doesn't send us the same
// offers.
filters.set_refuse_seconds(Duration::max().secs());
for (size_t i = 0; i < offers.size(); i++) {
driver->declineOffer(offers[i].id(), filters);
}
}
void offerRescinded(SchedulerDriver*, const OfferID&) override {}
void statusUpdate(SchedulerDriver*, const TaskStatus&) override {}
void frameworkMessage(
SchedulerDriver*,
const ExecutorID&,
const SlaveID&,
const string&) override {}
void slaveLost(SchedulerDriver*, const SlaveID&) override {}
void executorLost(
SchedulerDriver*,
const ExecutorID&,
const SlaveID&,
int) override {}
void error(SchedulerDriver*, const string& error) override
{
// Terminating process with EXIT here because we cannot interrupt
// LoadGenerator's long-running loop.
EXIT(EXIT_FAILURE) << "Error received: " << error;
}
private:
LoadGenerator* generator;
const double qps;
const Option<Duration> duration;
};
class Flags : public virtual mesos::internal::examples::Flags
{
public:
Flags()
{
add(&Flags::qps,
"qps",
"Required. Generate load at this specified rate (queries per second).\n"
"Note that this rate is an upper bound and the real rate may be less.\n"
"Also, setting the qps too high can cause the local machine to run\n"
"out of ephemeral ports during master failover (if scheduler driver\n"
"fails to detect master change soon enough after the old master exits\n"
"and the scheduler keeps trying to connect to the dead master. See\n"
"MESOS-1560 for more details)");
add(&Flags::duration,
"duration",
"Run LoadGenerator for the specified duration.\n"
"Without this option this framework would keep generating load\n"
"forever as long as it is connected to the master");
}
double qps;
Option<Duration> duration;
};
int main(int argc, char** argv)
{
Flags flags;
Try<flags::Warnings> load = flags.load("MESOS_EXAMPLE_", argc, argv);
if (load.isError()) {
std::cerr << flags.usage(load.error()) << std::endl;
return EXIT_FAILURE;
}
if (flags.help) {
std::cout << flags.usage() << std::endl;
return EXIT_SUCCESS;
}
mesos::internal::logging::initialize(argv[0], false);
// Log any flag warnings (after logging is initialized).
foreach (const flags::Warning& warning, load->warnings) {
LOG(WARNING) << warning.message;
}
if (flags.qps <= 0.0) {
EXIT(EXIT_FAILURE) << "Flag '--qps' needs to be greater than zero";
}
LoadGeneratorScheduler scheduler(flags.qps, flags.duration);
FrameworkInfo framework;
framework.set_user(""); // Have Mesos fill in the current user.
framework.set_principal(flags.principal);
framework.set_name(FRAMEWORK_NAME);
framework.set_checkpoint(flags.checkpoint);
framework.add_roles(flags.role);
framework.add_capabilities()->set_type(
FrameworkInfo::Capability::RESERVATION_REFINEMENT);
framework.set_checkpoint(flags.checkpoint);
if (flags.master == "local") {
// Configure master.
os::setenv("MESOS_ROLES", flags.role);
os::setenv("MESOS_AUTHENTICATE_FRAMEWORKS", stringify(flags.authenticate));
ACLs acls;
ACL::RegisterFramework* acl = acls.add_register_frameworks();
acl->mutable_principals()->set_type(ACL::Entity::ANY);
acl->mutable_roles()->add_values("*");
os::setenv("MESOS_ACLS", stringify(JSON::protobuf(acls)));
}
MesosSchedulerDriver* driver;
if (flags.authenticate) {
LOG(INFO) << "Enabling authentication for the framework";
Credential credential;
credential.set_principal(flags.principal);
if (flags.secret.isSome()) {
credential.set_secret(flags.secret.get());
}
driver = new MesosSchedulerDriver(
&scheduler,
framework,
flags.master,
credential);
} else {
driver = new MesosSchedulerDriver(
&scheduler,
framework,
flags.master);
}
int status = driver->run() == DRIVER_STOPPED ? EXIT_SUCCESS : EXIT_FAILURE;
// Ensure that the driver process terminates.
driver->stop();
delete driver;
return status;
}