blob: 2fd82ad5749e3948c590ce5e9816566a3627b885 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "resource_provider/daemon.hpp"
#include <utility>
#include <glog/logging.h>
#include <mesos/type_utils.hpp>
#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/dispatch.hpp>
#include <process/id.hpp>
#include <process/process.hpp>
#include <stout/foreach.hpp>
#include <stout/hashmap.hpp>
#include <stout/json.hpp>
#include <stout/nothing.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/protobuf.hpp>
#include <stout/strings.hpp>
#include <stout/try.hpp>
#include "common/http.hpp"
#include "common/validation.hpp"
#include "internal/devolve.hpp"
#include "internal/evolve.hpp"
#include "resource_provider/local.hpp"
namespace http = process::http;
using std::list;
using std::string;
using std::vector;
using process::await;
using process::defer;
using process::dispatch;
using process::Failure;
using process::Future;
using process::Owned;
using process::Process;
using process::ProcessBase;
using process::spawn;
using process::terminate;
using process::wait;
using process::http::authentication::Principal;
namespace mesos {
namespace internal {
// Directory under the resource provider config directory to store
// temporarily files for adding and updating resource provider config
// files atomically (all-or-nothing).
constexpr char STAGING_DIR[] = ".staging";
class LocalResourceProviderDaemonProcess
: public Process<LocalResourceProviderDaemonProcess>
{
public:
LocalResourceProviderDaemonProcess(
const http::URL& _url,
const string& _workDir,
const Option<string>& _configDir,
SecretGenerator* _secretGenerator,
bool _strict)
: ProcessBase(process::ID::generate("local-resource-provider-daemon")),
url(_url),
workDir(_workDir),
configDir(_configDir),
secretGenerator(_secretGenerator),
strict(_strict) {}
LocalResourceProviderDaemonProcess(
const LocalResourceProviderDaemonProcess& other) = delete;
LocalResourceProviderDaemonProcess& operator=(
const LocalResourceProviderDaemonProcess& other) = delete;
void start(const SlaveID& _slaveId);
Future<bool> add(const ResourceProviderInfo& info);
Future<bool> update(const ResourceProviderInfo& info);
Future<Nothing> remove(const string& type, const string& name);
protected:
void initialize() override;
private:
struct ProviderData
{
ProviderData(const string& _path, const ResourceProviderInfo& _info)
: path(_path), info(_info), version(id::UUID::random()) {}
const string path;
ResourceProviderInfo info;
Option<string> authToken;
// The `version` is used to check if `provider` holds a resource
// provider instance that is in sync with the current config.
id::UUID version;
Owned<LocalResourceProvider> provider;
// If set, it means that the resource provider is being removed, and the
// future would be completed once the removal is done. Note that this object
// will be destructed as soon as the future is ready.
Option<Future<Nothing>> removing;
};
Try<Nothing> load(const string& path);
Try<Nothing> save(const string& path, const ResourceProviderInfo& info);
// NOTE: `launch` should only be called once for each config version.
// It will pick up the latest config to launch the resource provider.
Future<Nothing> launch(
const string& type,
const string& name);
Future<Nothing> _launch(
const string& type,
const string& name,
const id::UUID& version,
const Option<string>& authToken);
Future<Option<string>> generateAuthToken(const ResourceProviderInfo& info);
Future<Nothing> cleanupContainers(
const ResourceProviderInfo& info,
const Option<string>& authToken);
const http::URL url;
const string workDir;
const Option<string> configDir;
SecretGenerator* const secretGenerator;
const bool strict;
Option<SlaveID> slaveId;
hashmap<string, hashmap<string, ProviderData>> providers;
};
void LocalResourceProviderDaemonProcess::start(const SlaveID& _slaveId)
{
// NOTE: It's possible that the slave receives multiple
// `SlaveRegisteredMessage`s and detects a disconnection in between.
// In that case, `start` will be called multiple times from
// `Slave::registered`.
if (slaveId.isSome()) {
CHECK_EQ(slaveId.get(), _slaveId)
<< "Cannot start local resource provider daemon with id " << _slaveId
<< " (expected: " << slaveId.get() << ")";
return;
}
slaveId = _slaveId;
foreachkey (const string& type, providers) {
foreachpair (const string& name,
const ProviderData& data,
providers[type]) {
if (data.removing.isSome()) {
continue;
}
auto error = [=](const string& message) {
LOG(ERROR) << "Failed to launch resource provider with type '" << type
<< "' and name '" << name << "': " << message;
};
launch(type, name)
.onFailed(error)
.onDiscarded(std::bind(error, "future discarded"));
}
}
}
Future<bool> LocalResourceProviderDaemonProcess::add(
const ResourceProviderInfo& info)
{
CHECK(!info.has_id()); // Should have already been validated by the agent.
if (configDir.isNone()) {
return Failure("Missing required flag --resource_provider_config_dir");
}
// Return true if the info has been added for idempotency.
if (providers[info.type()].contains(info.name())) {
const ProviderData& data = providers[info.type()].at(info.name());
if (data.removing.isSome()) {
return Failure(
"Failed to add resource provider with type '" + info.type() +
"' and name '" + info.name() + "' as a removal is still in progress");
}
return data.info == info;
}
// Generate a filename for the config.
// NOTE: We use the following template `<type>.<name>.<uuid>.json`
// with a random UUID to generate a new filename to avoid any conflict
// with existing ad-hoc config files.
const string path = path::join(
configDir.get(),
strings::join(".", info.type(), info.name(), id::UUID::random(), "json"));
LOG(INFO) << "Creating new config file '" << path << "'";
Try<Nothing> _save = save(path, info);
if (_save.isError()) {
return Failure(
"Failed to write config file '" + path + "': " + _save.error());
}
providers[info.type()].put(info.name(), {path, info});
// Launch the resource provider if the daemon is already started.
if (slaveId.isSome()) {
auto err = [](const ResourceProviderInfo& info, const string& message) {
LOG(ERROR)
<< "Failed to launch resource provider with type '" << info.type()
<< "' and name '" << info.name() << "': " << message;
};
launch(info.type(), info.name())
.onFailed(std::bind(err, info, lambda::_1))
.onDiscarded(std::bind(err, info, "future discarded"));
}
return true;
}
Future<bool> LocalResourceProviderDaemonProcess::update(
const ResourceProviderInfo& info)
{
CHECK(!info.has_id()); // Should have already been validated by the agent.
if (configDir.isNone()) {
return Failure("Missing required flag --resource_provider_config_dir");
}
if (!providers[info.type()].contains(info.name())) {
return false;
}
ProviderData& data = providers[info.type()].at(info.name());
if (data.removing.isSome()) {
return Failure(
"Failed to update resource provider with type '" + info.type() +
"' and name '" + info.name() + "' as a removal is still in progress");
}
// Return true if the info has been updated for idempotency.
if (data.info == info) {
return true;
}
Try<Nothing> _save = save(data.path, info);
if (_save.isError()) {
return Failure(
"Failed to write config file '" + data.path + "': " + _save.error());
}
data.info = info;
// Update `version` to indicate that the config has been updated.
data.version = id::UUID::random();
// Launch the resource provider if the daemon is already started.
if (slaveId.isSome()) {
auto err = [](const ResourceProviderInfo& info, const string& message) {
LOG(ERROR)
<< "Failed to launch resource provider with type '" << info.type()
<< "' and name '" << info.name() << "': " << message;
};
launch(info.type(), info.name())
.onFailed(std::bind(err, info, lambda::_1))
.onDiscarded(std::bind(err, info, "future discarded"));
}
return true;
}
Future<Nothing> LocalResourceProviderDaemonProcess::remove(
const string& type,
const string& name)
{
if (configDir.isNone()) {
return Failure("Missing required flag --resource_provider_config_dir");
}
// Do nothing if the info has been removed for idempotency.
if (!providers[type].contains(name)) {
return Nothing();
}
ProviderData& data = providers[type].at(name);
// Return the same future if it is being removed for idempotency.
if (data.removing.isSome() && data.removing->isPending()) {
return data.removing.get();
}
// Destruct the resource provider instance to stop its container daemons, then
// do a best-effort cleanup of the standalone containers launched by the
// removed resource provider.
// TODO(chhsiao): This is not ideal since the daemon does not know how to
// perform resource-provider-specific cleanups. We should refactor this into a
// `LocalResourceProvider::stop` virtual function. However this also means
// that we need to ensure that we must have a resource provider instance with
// a running actor to invoke `stop`. Given that `launch` is asynchronous, we
// need to carefully redesign the state machine, and the semantics of the
// `{ADD,UPDATE,REMOVE}_RESOURCE_PROVIDER_CONFIG` API calls might be affected.
// In addition, we should consider how to reconcile orphaned containers.
data.provider.reset();
data.removing = cleanupContainers(data.info, data.authToken)
.then(defer(self(), [this, type, name]() -> Future<Nothing> {
Try<Nothing> rm = os::rm(providers[type].at(name).path);
if (rm.isError()) {
return Failure(
"Failed to remove config file '" + providers[type].at(name).path +
"': " + rm.error());
}
providers[type].erase(name);
return Nothing();
}));
return data.removing.get();
}
void LocalResourceProviderDaemonProcess::initialize()
{
if (configDir.isNone()) {
return;
}
Try<list<string>> entries = os::ls(configDir.get());
if (entries.isError()) {
LOG(FATAL) << "Unable to list the resource provider config directory '"
<< configDir.get() << "': " << entries.error();
return;
}
foreach (const string& entry, entries.get()) {
const string path = path::join(configDir.get(), entry);
// Skip subdirectories in the resource provider config directory,
// including the staging directory.
if (os::stat::isdir(path)) {
continue;
}
Try<Nothing> loading = load(path);
if (loading.isError()) {
LOG(ERROR) << "Failed to load resource provider config '"
<< path << "': " << loading.error();
}
}
}
Try<Nothing> LocalResourceProviderDaemonProcess::load(const string& path)
{
Try<string> read = os::read(path);
if (read.isError()) {
return Error("Failed to read the config file: " + read.error());
}
Try<JSON::Object> json = JSON::parse<JSON::Object>(read.get());
if (json.isError()) {
return Error("Failed to parse the JSON config: " + json.error());
}
Try<ResourceProviderInfo> info =
::protobuf::parse<ResourceProviderInfo>(json.get());
if (info.isError()) {
return Error("Not a valid resource provider config: " + info.error());
}
if (info->has_id()) {
return Error("'ResourceProviderInfo.id' must not be set");
}
// Ensure that ('type', 'name') pair is unique.
if (providers[info->type()].contains(info->name())) {
return Error(
"Multiple resource providers with type '" + info->type() +
"' and name '" + info->name() + "'");
}
providers[info->type()].put(info->name(), {path, std::move(info.get())});
return Nothing();
}
// NOTE: We provide atomic (all-or-nothing) semantics here by always
// writing to a temporary file to the staging directory first then using
// `os::rename` to atomically move it to the desired path.
Try<Nothing> LocalResourceProviderDaemonProcess::save(
const string& path,
const ResourceProviderInfo& info)
{
CHECK_SOME(configDir);
// NOTE: We create the staging directory in the resource provider
// config directory to make sure that the renaming below does not
// cross devices (MESOS-2319).
// TODO(chhsiao): Consider adding a way to garbage collect the staging
// directory.
const string stagingDir = path::join(configDir.get(), STAGING_DIR);
Try<Nothing> mkdir = os::mkdir(stagingDir);
if (mkdir.isError()) {
return Error(
"Failed to create directory '" + stagingDir + "': " + mkdir.error());
}
const string stagingPath = path::join(stagingDir, Path(path).basename());
Try<Nothing> write = os::write(stagingPath, stringify(JSON::protobuf(info)));
if (write.isError()) {
// Try to remove the temporary file on error.
os::rm(stagingPath);
return Error(
"Failed to write temporary file '" + stagingPath + "': " +
write.error());
}
Try<Nothing> rename = os::rename(stagingPath, path);
if (rename.isError()) {
// Try to remove the temporary file on error.
os::rm(stagingPath);
return Error(
"Failed to rename '" + stagingPath + "' to '" + path + "': " +
rename.error());
}
return Nothing();
}
Future<Nothing> LocalResourceProviderDaemonProcess::launch(
const string& type,
const string& name)
{
CHECK_SOME(slaveId);
CHECK(providers[type].contains(name));
ProviderData& data = providers[type].at(name);
CHECK(data.removing.isNone());
// Destruct the previous resource provider (which will synchronously
// terminate its actor and driver) if there is one.
data.provider.reset();
return generateAuthToken(data.info)
.then(defer(self(), &Self::_launch, type, name, data.version, lambda::_1));
}
Future<Nothing> LocalResourceProviderDaemonProcess::_launch(
const string& type,
const string& name,
const id::UUID& version,
const Option<string>& authToken)
{
// If the resource provider config is removed, abort the launch sequence.
if (!providers[type].contains(name)) {
return Nothing();
}
ProviderData& data = providers[type].at(name);
// If the resource provider config is being removed, nothing needs to be done.
if (data.removing.isSome()) {
return Nothing();
}
// If there is a version mismatch, abort the launch sequence since
// `authToken` might be outdated. The callback updating the version
// should have dispatched another launch sequence.
if (version != data.version) {
return Nothing();
}
Try<Owned<LocalResourceProvider>> provider = LocalResourceProvider::create(
url, workDir, data.info, slaveId.get(), authToken, strict);
if (provider.isError()) {
return Failure(
"Failed to create resource provider with type '" + type +
"' and name '" + name + "': " + provider.error());
}
data.authToken = authToken;
data.provider = std::move(provider.get());
return Nothing();
}
// Generates a secret for local resource provider authentication if needed.
Future<Option<string>> LocalResourceProviderDaemonProcess::generateAuthToken(
const ResourceProviderInfo& info)
{
if (secretGenerator == nullptr) {
return None();
}
return secretGenerator->generate(LocalResourceProvider::principal(info))
.then(defer(self(), [](const Secret& secret) -> Future<Option<string>> {
Option<Error> error = common::validation::validateSecret(secret);
if (error.isSome()) {
return Failure(
"Failed to validate generated secret: " + error->message);
} else if (secret.type() != Secret::VALUE) {
return Failure(
"Expecting generated secret to be of VALUE type instead of " +
stringify(secret.type()) + " type; " +
"only VALUE type secrets are supported at this time");
}
CHECK(secret.has_value());
return secret.value().data();
}));
}
Future<Nothing> LocalResourceProviderDaemonProcess::cleanupContainers(
const ResourceProviderInfo& info,
const Option<string>& authToken)
{
const Principal principal = LocalResourceProvider::principal(info);
CHECK(principal.claims.contains("cid_prefix"));
const string& cidPrefix = principal.claims.at("cid_prefix");
LOG(INFO) << "Cleaning up containers prefixed by '" << cidPrefix << "'";
// TODO(chhsiao): Consider using a more reliable way to get the v1 endpoint.
http::URL agentUrl = url;
agentUrl.path = Path(url.path).dirname();
http::Headers headers{{"Accept", stringify(ContentType::PROTOBUF)}};
if (authToken.isSome()) {
headers["Authorization"] = "Bearer " + authToken.get();
}
agent::Call call;
call.set_type(agent::Call::GET_CONTAINERS);
call.mutable_get_containers()->set_show_nested(false);
call.mutable_get_containers()->set_show_standalone(true);
return http::post(
agentUrl,
headers,
serialize(ContentType::PROTOBUF, evolve(call)),
stringify(ContentType::PROTOBUF))
.then(defer(self(), [=](
const http::Response& httpResponse) -> Future<Nothing> {
if (httpResponse.status != http::OK().status) {
return Failure(
"Failed to get containers: Unexpected response '" +
httpResponse.status + "' (" + httpResponse.body + ")");
}
Try<v1::agent::Response> v1Response = deserialize<v1::agent::Response>(
ContentType::PROTOBUF, httpResponse.body);
if (v1Response.isError()) {
return Failure("Failed to get containers: " + v1Response.error());
}
vector<Future<Nothing>> futures;
agent::Response response = devolve(v1Response.get());
foreach (const agent::Response::GetContainers::Container& container,
response.get_containers().containers()) {
const ContainerID& containerId = container.container_id();
if (!strings::startsWith(containerId.value(), cidPrefix)) {
continue;
}
// NOTE: We skip containers that are not actually running by checking
// their `executor_pid`s to avoid `ESRCH` errors or killing an arbitrary
// process. But we might skip the ones that are being launched as well.
if (!container.has_container_status() ||
!container.container_status().has_executor_pid()) {
LOG(WARNING)
<< "Skipped killing container '" << containerId
<< "' because it is not running";
continue;
}
agent::Call call;
call.set_type(agent::Call::KILL_CONTAINER);
call.mutable_kill_container()->mutable_container_id()
->CopyFrom(containerId);
LOG(INFO) << "Killing container '" << containerId << "'";
futures.push_back(http::post(
agentUrl,
headers,
serialize(ContentType::PROTOBUF, evolve(call)),
stringify(ContentType::PROTOBUF))
.then(defer(self(), [=](
const http::Response& response) -> Future<Nothing> {
if (response.status == http::NotFound().status) {
LOG(WARNING)
<< "Skipped waiting for container '" << containerId
<< "' because it no longer exists";
return Nothing();
}
if (response.status != http::OK().status) {
return Failure(
"Failed to kill container '" + stringify(containerId) +
"': Unexpected response '" + response.status + "' (" +
response.body + ")");
}
LOG(INFO) << "Waiting for container '" << containerId << "'";
agent::Call call;
call.set_type(agent::Call::WAIT_CONTAINER);
call.mutable_wait_container()->mutable_container_id()
->CopyFrom(containerId);
return http::post(
agentUrl,
headers,
serialize(ContentType::PROTOBUF, evolve(call)),
stringify(ContentType::PROTOBUF))
.then([containerId](
const http::Response& response) -> Future<Nothing> {
if (response.status != http::OK().status &&
response.status != http::NotFound().status) {
return Failure(
"Failed to wait for container '" +
stringify(containerId) + "': Unexpected response '" +
response.status + "' (" + response.body + ")");
}
return Nothing();
});
})));
}
// We use await here to do a best-effort cleanup.
return await(futures)
.then([cidPrefix](
const vector<Future<Nothing>>& futures) -> Future<Nothing> {
foreach (const Future<Nothing>& future, futures) {
if (!future.isReady()) {
return Failure(
"Failed to clean up containers prefixed by '" + cidPrefix +
"': " + stringify(futures));
}
}
return Nothing();
});
}));
}
Try<Owned<LocalResourceProviderDaemon>> LocalResourceProviderDaemon::create(
const http::URL& url,
const slave::Flags& flags,
SecretGenerator* secretGenerator)
{
// We require that the config directory exists to create a daemon.
Option<string> configDir = flags.resource_provider_config_dir;
if (configDir.isSome() && !os::exists(configDir.get())) {
return Error("Config directory '" + configDir.get() + "' does not exist");
}
return new LocalResourceProviderDaemon(
url,
flags.work_dir,
configDir,
secretGenerator,
flags.strict);
}
LocalResourceProviderDaemon::LocalResourceProviderDaemon(
const http::URL& url,
const string& workDir,
const Option<string>& configDir,
SecretGenerator* secretGenerator,
bool strict)
: process(new LocalResourceProviderDaemonProcess(
url, workDir, configDir, secretGenerator, strict))
{
spawn(CHECK_NOTNULL(process.get()));
}
LocalResourceProviderDaemon::~LocalResourceProviderDaemon()
{
terminate(process.get());
wait(process.get());
}
void LocalResourceProviderDaemon::start(const SlaveID& slaveId)
{
dispatch(process.get(), &LocalResourceProviderDaemonProcess::start, slaveId);
}
Future<bool> LocalResourceProviderDaemon::add(const ResourceProviderInfo& info)
{
return dispatch(
process.get(),
&LocalResourceProviderDaemonProcess::add,
info);
}
Future<bool> LocalResourceProviderDaemon::update(
const ResourceProviderInfo& info)
{
return dispatch(
process.get(),
&LocalResourceProviderDaemonProcess::update,
info);
}
Future<Nothing> LocalResourceProviderDaemon::remove(
const string& type,
const string& name)
{
return dispatch(
process.get(),
&LocalResourceProviderDaemonProcess::remove,
type,
name);
}
} // namespace internal {
} // namespace mesos {