blob: c4f449512a4cc150de8a99f44a525b96a2fc1ae2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <mesos/hook.hpp>
#include <mesos/mesos.hpp>
#include <mesos/module.hpp>
#include <mesos/module/hook.hpp>
#include <process/future.hpp>
#include <process/id.hpp>
#include <process/process.hpp>
#include <process/protobuf.hpp>
#include <stout/foreach.hpp>
#include <stout/os.hpp>
#include <stout/try.hpp>
#include "messages/messages.hpp"
using namespace mesos;
using std::map;
using std::string;
using process::Failure;
using process::Future;
// Must be kept in sync with variables of the same name in
// tests/hook_tests.cpp.
const char* testLabelKey = "MESOS_Test_Label";
const char* testLabelValue = "ApacheMesos";
const char* testRemoveLabelKey = "MESOS_Test_Remove_Label";
const char* testErrorLabelKey = "MESOS_Test_Error_Label";
class HookProcess : public ProtobufProcess<HookProcess>
{
public:
HookProcess() : ProcessBase(process::ID::generate("example-hook")) {}
void initialize() override
{
install<internal::HookExecuted>(
&HookProcess::handler,
&internal::HookExecuted::module);
}
void signal()
{
LOG(INFO) << "HookProcess emitting signal";
internal::HookExecuted message;
message.set_module("org_apache_mesos_TestHook");
send(self(), message);
}
void handler(const process::UPID& from, const string& module)
{
LOG(INFO) << "HookProcess caught signal: " << module;
promise.set(Nothing());
}
Future<Nothing> await()
{
return promise.future();
}
private:
process::Promise<Nothing> promise;
};
class TestHook : public Hook
{
public:
Result<Labels> masterLaunchTaskLabelDecorator(
const TaskInfo& taskInfo,
const FrameworkInfo& frameworkInfo,
const SlaveInfo& slaveInfo) override
{
LOG(INFO) << "Executing 'masterLaunchTaskLabelDecorator' hook";
Labels labels;
// Set one known label.
Label* newLabel = labels.add_labels();
newLabel->set_key(testLabelKey);
newLabel->set_value(testLabelValue);
// Remove the 'testRemoveLabelKey' label which was set by the test.
foreach (const Label& oldLabel, taskInfo.labels().labels()) {
if (oldLabel.key() != testRemoveLabelKey) {
labels.add_labels()->CopyFrom(oldLabel);
}
}
return labels;
}
Try<Nothing> masterSlaveLostHook(const SlaveInfo& slaveInfo) override
{
LOG(INFO) << "Executing 'masterSlaveLostHook' in agent '"
<< slaveInfo.id() << "'";
// TODO(nnielsen): Add argument to signal(), so we can filter messages from
// the `masterSlaveLostHook` from `slaveRemoveExecutorHook`.
// NOTE: Will not be a problem **as long as** the test doesn't start any
// tasks.
HookProcess hookProcess;
process::spawn(&hookProcess);
Future<Nothing> future =
process::dispatch(hookProcess, &HookProcess::await);
process::dispatch(hookProcess, &HookProcess::signal);
// Make sure we don't terminate the process before the message self-send has
// completed.
future.await();
process::terminate(hookProcess);
process::wait(hookProcess);
return Nothing();
}
// TODO(nnielsen): Split hook tests into multiple modules to avoid
// interference.
Result<Labels> slaveRunTaskLabelDecorator(
const TaskInfo& taskInfo,
const ExecutorInfo& executorInfo,
const FrameworkInfo& frameworkInfo,
const SlaveInfo& slaveInfo) override
{
LOG(INFO) << "Executing 'slaveRunTaskLabelDecorator' hook";
Labels labels;
// Set one known label.
Label* newLabel = labels.add_labels();
newLabel->set_key("baz");
newLabel->set_value("qux");
// Remove label which was set by test.
foreach (const Label& oldLabel, taskInfo.labels().labels()) {
if (oldLabel.key() != "foo") {
labels.add_labels()->CopyFrom(oldLabel);
}
}
return labels;
}
// In this hook, we create a new environment variable "FOO" and set
// it's value to "bar".
Result<Environment> slaveExecutorEnvironmentDecorator(
const ExecutorInfo& executorInfo) override
{
LOG(INFO) << "Executing 'slaveExecutorEnvironmentDecorator' hook";
Environment environment;
if (executorInfo.command().has_environment()) {
environment.CopyFrom(executorInfo.command().environment());
}
Environment::Variable* variable = environment.add_variables();
variable->set_name("FOO");
variable->set_value("bar");
return environment;
}
// In this hook, we check for the presence of a label, and if set
// we return a failure, effectively failing the container creation.
// Otherwise we add an environment variable to the executor and task.
// Additionally, this hook creates a file named "foo" in the container
// work directory (sandbox).
Future<Option<DockerTaskExecutorPrepareInfo>>
slavePreLaunchDockerTaskExecutorDecorator(
const Option<TaskInfo>& taskInfo,
const ExecutorInfo& executorInfo,
const string& containerName,
const string& containerWorkDirectory,
const string& mappedSandboxDirectory,
const Option<map<string, string>>& env) override
{
LOG(INFO) << "Executing 'slavePreLaunchDockerTaskExecutorDecorator' hook";
if (taskInfo.isSome()) {
foreach (const Label& label, taskInfo->labels().labels()) {
if (label.key() == testErrorLabelKey) {
return Failure("Spotted error label");
}
}
}
DockerTaskExecutorPrepareInfo prepareInfo;
Environment* taskEnvironment =
prepareInfo.mutable_taskenvironment();
Environment::Variable* variable = taskEnvironment->add_variables();
variable->set_name("HOOKTEST_TASK");
variable->set_value("foo");
Environment* executorEnvironment =
prepareInfo.mutable_executorenvironment();
variable = executorEnvironment->add_variables();
variable->set_name("HOOKTEST_EXECUTOR");
variable->set_value("bar");
os::touch(path::join(containerWorkDirectory, "foo"));
return prepareInfo;
}
Try<Nothing> slavePostFetchHook(
const ContainerID& containerId,
const string& directory) override
{
LOG(INFO) << "Executing 'slavePostFetchHook'";
const string path = path::join(directory, "post_fetch_hook");
if (os::exists(path)) {
return os::rm(path);
} else {
return Nothing();
}
}
// This hook locates the file created by environment decorator hook
// and deletes it.
Try<Nothing> slaveRemoveExecutorHook(
const FrameworkInfo& frameworkInfo,
const ExecutorInfo& executorInfo) override
{
LOG(INFO) << "Executing 'slaveRemoveExecutorHook'";
// Send a HookExecuted message to ourself. The hook test
// "VerifySlaveLaunchExecutorHook" will wait for the testing
// infrastructure to intercept this message. The intercepted message
// indicates successful execution of this hook.
HookProcess hookProcess;
process::spawn(&hookProcess);
Future<Nothing> future =
process::dispatch(hookProcess, &HookProcess::await);
process::dispatch(hookProcess, &HookProcess::signal);
// Make sure we don't terminate the process before the message self-send has
// completed.
future.await();
process::terminate(hookProcess);
process::wait(hookProcess);
return Nothing();
}
Result<TaskStatus> slaveTaskStatusDecorator(
const FrameworkID& frameworkId,
const TaskStatus& status) override
{
LOG(INFO) << "Executing 'slaveTaskStatusDecorator' hook";
Labels labels;
// Set one known label.
Label* newLabel = labels.add_labels();
newLabel->set_key("bar");
newLabel->set_value("qux");
// Remove label which was set by test.
foreach (const Label& oldLabel, status.labels().labels()) {
if (oldLabel.key() != "foo") {
labels.add_labels()->CopyFrom(oldLabel);
}
}
TaskStatus result;
result.mutable_labels()->CopyFrom(labels);
// Set an IP address, a network isolation group, and a known label
// in network info. This data is later validated by the
// 'HookTest.VerifySlaveTaskStatusDecorator' test.
NetworkInfo* networkInfo =
result.mutable_container_status()->add_network_infos();
NetworkInfo::IPAddress* ipAddress = networkInfo->add_ip_addresses();
ipAddress->set_ip_address("4.3.2.1");
networkInfo->add_groups("public");
Label* networkInfoLabel = networkInfo->mutable_labels()->add_labels();
networkInfoLabel->set_key("net_foo");
networkInfoLabel->set_value("net_bar");
return result;
}
Result<Resources> slaveResourcesDecorator(
const SlaveInfo& slaveInfo) override
{
LOG(INFO) << "Executing 'slaveResourcesDecorator' hook";
Resources resources;
// Remove the existing "cpus" resource, it will be overwritten by the
// current hook. Keep other resources unchanged.
foreach (const Resource& resource, slaveInfo.resources()) {
if (resource.name() != "cpus") {
resources += resource;
}
}
// Force the value of "cpus" to 4 and add a new custom resource named "foo"
// of type set.
resources += Resources::parse("cpus:4;foo:{bar,baz}").get();
return resources;
}
Result<Attributes> slaveAttributesDecorator(
const SlaveInfo& slaveInfo) override
{
LOG(INFO) << "Executing 'slaveAttributesDecorator' hook";
Attributes attributes = slaveInfo.attributes();
attributes.add(Attributes::parse("rack", "rack1"));
return attributes;
}
};
static Hook* createHook(const Parameters& parameters)
{
return new TestHook();
}
// Declares a Hook module named 'org_apache_mesos_TestHook'.
mesos::modules::Module<Hook> org_apache_mesos_TestHook(
MESOS_MODULE_API_VERSION,
MESOS_VERSION,
"Apache Mesos",
"modules@mesos.apache.org",
"Test Hook module.",
nullptr,
createHook);