blob: 3e6b64bae1c56d3ca27edd0debac428b47f17d04 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "resource_provider/registrar.hpp"
#include <algorithm>
#include <deque>
#include <string>
#include <utility>
#include <glog/logging.h>
#include <mesos/type_utils.hpp>
#include <mesos/state/in_memory.hpp>
#ifndef __WINDOWS__
#include <mesos/state/leveldb.hpp>
#endif // __WINDOWS__
#include <mesos/state/protobuf.hpp>
#include <process/defer.hpp>
#include <process/process.hpp>
#include <stout/none.hpp>
#include <stout/nothing.hpp>
#include <stout/option.hpp>
#include <stout/path.hpp>
#include <stout/unreachable.hpp>
#include "master/registrar.hpp"
#include "slave/paths.hpp"
using std::deque;
using std::string;
using mesos::resource_provider::registry::Registry;
using mesos::resource_provider::registry::ResourceProvider;
using mesos::state::InMemoryStorage;
#ifndef __WINDOWS__
using mesos::state::LevelDBStorage;
#endif // __WINDOWS__
using mesos::state::Storage;
using mesos::state::protobuf::Variable;
using process::Failure;
using process::Future;
using process::Owned;
using process::Process;
using process::Promise;
using process::defer;
using process::spawn;
using process::terminate;
using process::wait;
namespace master = mesos::internal::master;
namespace slave = mesos::internal::slave;
namespace mesos {
namespace resource_provider {
Try<bool> Registrar::Operation::operator()(Registry* registry)
{
Try<bool> result = perform(registry);
success = !result.isError();
return result;
}
bool Registrar::Operation::set()
{
return Promise<bool>::set(success);
}
Try<Owned<Registrar>> Registrar::create(
const slave::Flags& slaveFlags,
const SlaveID& slaveId)
{
return new AgentRegistrar(slaveFlags, slaveId);
}
Try<Owned<Registrar>> Registrar::create(master::Registrar* registrar)
{
return new MasterRegistrar(registrar);
}
AdmitResourceProvider::AdmitResourceProvider(const ResourceProviderID& _id)
: id(_id) {}
Try<bool> AdmitResourceProvider::perform(Registry* registry)
{
if (std::find_if(
registry->resource_providers().begin(),
registry->resource_providers().end(),
[this](const ResourceProvider& resourceProvider) {
return resourceProvider.id() == this->id;
}) != registry->resource_providers().end()) {
return Error("Resource provider already admitted");
}
ResourceProvider resourceProvider;
resourceProvider.mutable_id()->CopyFrom(id);
registry->add_resource_providers()->CopyFrom(resourceProvider);
return true; // Mutation.
}
RemoveResourceProvider::RemoveResourceProvider(const ResourceProviderID& _id)
: id(_id) {}
Try<bool> RemoveResourceProvider::perform(Registry* registry)
{
auto pos = std::find_if(
registry->resource_providers().begin(),
registry->resource_providers().end(),
[this](const ResourceProvider& resourceProvider) {
return resourceProvider.id() == this->id;
});
if (pos == registry->resource_providers().end()) {
return Error("Attempted to remove an unknown resource provider");
}
registry->mutable_resource_providers()->erase(pos);
return true; // Mutation.
}
class AgentRegistrarProcess : public Process<AgentRegistrarProcess>
{
public:
AgentRegistrarProcess(const slave::Flags& flags, const SlaveID& slaveId);
Future<Nothing> recover();
Future<bool> apply(Owned<Registrar::Operation> operation);
Future<bool> _apply(Owned<Registrar::Operation> operation);
void update();
void _update(
const Future<Option<Variable<Registry>>>& store,
const Registry& updatedRegistry,
deque<Owned<Registrar::Operation>> applied);
private:
Owned<Storage> storage;
// Use fully qualified type for `State` to disambiguate with `State`
// enumeration in `ProcessBase`.
mesos::state::protobuf::State state;
Option<Future<Nothing>> recovered;
Option<Registry> registry;
Option<Variable<Registry>> variable;
Option<Error> error;
deque<Owned<Registrar::Operation>> operations;
bool updating = false;
static Owned<Storage> createStorage(const std::string& path);
};
Owned<Storage> AgentRegistrarProcess::createStorage(const std::string& path)
{
// The registrar uses LevelDB as underlying storage. Since LevelDB
// is currently not supported on Windows (see MESOS-5932), we fall
// back to in-memory storage there.
//
// TODO(bbannier): Remove this Windows workaround once MESOS-5932 is fixed.
#ifndef __WINDOWS__
return Owned<Storage>(new LevelDBStorage(path));
#else
LOG(WARNING)
<< "Persisting resource provider manager state is not supported on Windows";
return Owned<Storage>(new InMemoryStorage());
#endif // __WINDOWS__
}
AgentRegistrarProcess::AgentRegistrarProcess(
const slave::Flags& flags, const SlaveID& slaveId)
: ProcessBase(process::ID::generate("resource-provider-agent-registrar")),
storage(createStorage(slave::paths::getResourceProviderRegistryPath(
flags.work_dir, slaveId))),
state(storage.get()) {}
Future<Nothing> AgentRegistrarProcess::recover()
{
constexpr char NAME[] = "RESOURCE_PROVIDER_REGISTRAR";
if (recovered.isNone()) {
recovered = state.fetch<Registry>(NAME).then(
defer(self(), [this](const Variable<Registry>& recovery) {
registry = recovery.get();
variable = recovery;
return Nothing();
}));
}
return recovered.get();
}
Future<bool> AgentRegistrarProcess::apply(Owned<Registrar::Operation> operation)
{
if (recovered.isNone()) {
return Failure("Attempted to apply the operation before recovering");
}
return recovered->then(defer(self(), &Self::_apply, std::move(operation)));
}
Future<bool> AgentRegistrarProcess::_apply(
Owned<Registrar::Operation> operation)
{
if (error.isSome()) {
return Failure(error.get());
}
operations.push_back(std::move(operation));
Future<bool> future = operations.back()->future();
if (!updating) {
update();
}
return future;
}
void AgentRegistrarProcess::update()
{
CHECK(!updating);
CHECK_NONE(error);
if (operations.empty()) {
return; // No-op.
}
updating = true;
CHECK_SOME(registry);
Registry updatedRegistry = registry.get();
foreach (Owned<Registrar::Operation>& operation, operations) {
(*operation)(&updatedRegistry);
}
// Serialize updated registry.
CHECK_SOME(variable);
Future<Option<Variable<Registry>>> store =
state.store(variable->mutate(updatedRegistry));
store.onAny(defer(
self(),
&Self::_update,
lambda::_1,
updatedRegistry,
std::move(operations)));
operations.clear();
}
void AgentRegistrarProcess::_update(
const Future<Option<Variable<Registry>>>& store,
const Registry& updatedRegistry,
deque<Owned<Registrar::Operation>> applied)
{
updating = false;
// Abort if the storage operation did not succeed.
if (!store.isReady() || store.get().isNone()) {
string message = "Failed to update registry: ";
if (store.isFailed()) {
message += store.failure();
} else if (store.isDiscarded()) {
message += "discarded";
} else {
message += "version mismatch";
}
while (!applied.empty()) {
applied.front()->fail(message);
applied.pop_front();
}
error = Error(message);
LOG(ERROR) << "Registrar aborting: " << message;
return;
}
variable = store->get();
registry = updatedRegistry;
// Remove the operations.
while (!applied.empty()) {
Owned<Registrar::Operation> operation = applied.front();
applied.pop_front();
operation->set();
}
if (!operations.empty()) {
update();
}
}
AgentRegistrar::AgentRegistrar(
const slave::Flags& slaveFlags,
const SlaveID& slaveId)
: process(new AgentRegistrarProcess(slaveFlags, slaveId))
{
process::spawn(process.get(), false);
}
AgentRegistrar::~AgentRegistrar()
{
process::terminate(*process);
process::wait(*process);
}
Future<Nothing> AgentRegistrar::recover()
{
return dispatch(process.get(), &AgentRegistrarProcess::recover);
}
Future<bool> AgentRegistrar::apply(Owned<Operation> operation)
{
return dispatch(
process.get(),
&AgentRegistrarProcess::apply,
std::move(operation));
}
class MasterRegistrarProcess : public Process<MasterRegistrarProcess>
{
// A helper class for adapting operations on the resource provider
// registry to the master registry.
class AdaptedOperation : public master::RegistryOperation
{
public:
AdaptedOperation(Owned<Registrar::Operation> operation);
private:
Try<bool> perform(internal::Registry* registry, hashset<SlaveID>*) override;
Owned<Registrar::Operation> operation;
AdaptedOperation(const AdaptedOperation&) = delete;
AdaptedOperation(AdaptedOperation&&) = default;
AdaptedOperation& operator=(const AdaptedOperation&) = delete;
AdaptedOperation& operator=(AdaptedOperation&&) = default;
};
public:
explicit MasterRegistrarProcess(master::Registrar* registrar);
Future<bool> apply(Owned<Registrar::Operation> operation);
private:
master::Registrar* registrar = nullptr;
};
MasterRegistrarProcess::AdaptedOperation::AdaptedOperation(
Owned<Registrar::Operation> operation)
: operation(std::move(operation)) {}
Try<bool> MasterRegistrarProcess::AdaptedOperation::perform(
internal::Registry* registry,
hashset<SlaveID>*)
{
return (*operation)(registry->mutable_resource_provider_registry());
}
MasterRegistrarProcess::MasterRegistrarProcess(master::Registrar* _registrar)
: ProcessBase(process::ID::generate("resource-provider-agent-registrar")),
registrar(_registrar) {}
Future<bool> MasterRegistrarProcess::apply(
Owned<Registrar::Operation> operation)
{
auto adaptedOperation = Owned<master::RegistryOperation>(
new AdaptedOperation(std::move(operation)));
return registrar->apply(std::move(adaptedOperation));
}
MasterRegistrar::MasterRegistrar(master::Registrar* registrar)
: process(new MasterRegistrarProcess(registrar))
{
spawn(process.get(), false);
}
MasterRegistrar::~MasterRegistrar()
{
terminate(*process);
wait(*process);
}
Future<Nothing> MasterRegistrar::recover()
{
return Nothing();
}
Future<bool> MasterRegistrar::apply(Owned<Operation> operation)
{
return dispatch(
process.get(),
&MasterRegistrarProcess::apply,
std::move(operation));
}
} // namespace resource_provider {
} // namespace mesos {