blob: 0dc49e689876a738c2049bbb2f60c5ea26c8bafb [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "resource_provider/registrar.hpp"
#include <algorithm>
#include <deque>
#include <string>
#include <utility>
#include <glog/logging.h>
#include <mesos/type_utils.hpp>
#include <mesos/state/in_memory.hpp>
#include <mesos/state/protobuf.hpp>
#include <process/defer.hpp>
#include <process/process.hpp>
#include <stout/none.hpp>
#include <stout/nothing.hpp>
#include <stout/option.hpp>
#include <stout/path.hpp>
#include <stout/unreachable.hpp>
#include "master/registrar.hpp"
#include "slave/paths.hpp"
using std::deque;
using std::string;
using mesos::resource_provider::registry::Registry;
using mesos::resource_provider::registry::ResourceProvider;
using mesos::state::Storage;
using mesos::state::protobuf::Variable;
using process::Failure;
using process::Future;
using process::Owned;
using process::Process;
using process::Promise;
using process::defer;
using process::spawn;
using process::terminate;
using process::wait;
namespace master = mesos::internal::master;
namespace slave = mesos::internal::slave;
namespace mesos {
namespace resource_provider {
Try<bool> Registrar::Operation::operator()(Registry* registry)
{
Try<bool> result = perform(registry);
success = !result.isError();
return result;
}
bool Registrar::Operation::set()
{
return Promise<bool>::set(success);
}
Try<Owned<Registrar>> Registrar::create(Owned<Storage> storage)
{
return new GenericRegistrar(std::move(storage));
}
Try<Owned<Registrar>> Registrar::create(
master::Registrar* registrar,
Registry registry)
{
return new MasterRegistrar(registrar, std::move(registry));
}
AdmitResourceProvider::AdmitResourceProvider(
const ResourceProvider& _resourceProvider)
: resourceProvider(_resourceProvider) {}
Try<bool> AdmitResourceProvider::perform(Registry* registry)
{
if (std::find_if(
registry->resource_providers().begin(),
registry->resource_providers().end(),
[this](const ResourceProvider& resourceProvider) {
return resourceProvider.id() == this->resourceProvider.id();
}) != registry->resource_providers().end()) {
return Error("Resource provider already admitted");
}
if (std::find_if(
registry->removed_resource_providers().begin(),
registry->removed_resource_providers().end(),
[this](const ResourceProvider& resourceProvider) {
return resourceProvider.id() == this->resourceProvider.id();
}) != registry->removed_resource_providers().end()) {
return Error("Resource provider was removed");
}
registry->add_resource_providers()->CopyFrom(resourceProvider);
return true; // Mutation.
}
RemoveResourceProvider::RemoveResourceProvider(const ResourceProviderID& _id)
: id(_id) {}
Try<bool> RemoveResourceProvider::perform(Registry* registry)
{
auto pos = std::find_if(
registry->resource_providers().begin(),
registry->resource_providers().end(),
[this](const ResourceProvider& resourceProvider) {
return resourceProvider.id() == this->id;
});
if (pos == registry->resource_providers().end()) {
return Error("Attempted to remove an unknown resource provider");
}
registry->add_removed_resource_providers()->CopyFrom(*pos);
registry->mutable_resource_providers()->erase(pos);
return true; // Mutation.
}
class GenericRegistrarProcess : public Process<GenericRegistrarProcess>
{
public:
GenericRegistrarProcess(Owned<Storage> storage);
Future<Registry> recover();
Future<bool> apply(Owned<Registrar::Operation> operation);
void update();
void initialize() override;
private:
Future<bool> _apply(Owned<Registrar::Operation> operation);
void _update(
const Future<Option<Variable<Registry>>>& store,
deque<Owned<Registrar::Operation>> applied);
// We explicitly hold the storage to keep it alive over the
// registrar's lifetime.
Owned<Storage> storage;
// Use fully qualified type for `State` to disambiguate with `State`
// enumeration in `ProcessBase`.
mesos::state::protobuf::State state;
Promise<Nothing> recovered;
Option<Variable<Registry>> variable;
Option<Error> error;
deque<Owned<Registrar::Operation>> operations;
bool updating = false;
};
GenericRegistrarProcess::GenericRegistrarProcess(Owned<Storage> _storage)
: ProcessBase(process::ID::generate("resource-provider-generic-registrar")),
storage(std::move(_storage)),
state(storage.get())
{
CHECK_NOTNULL(storage.get());
}
void GenericRegistrarProcess::initialize()
{
constexpr char NAME[] = "RESOURCE_PROVIDER_REGISTRAR";
CHECK_NONE(variable);
recovered.associate(state.fetch<Registry>(NAME).then(
defer(self(), [this](const Variable<Registry>& recovery) {
variable = recovery;
return Nothing();
})));
}
Future<Registry> GenericRegistrarProcess::recover()
{
// Prevent discards on the returned `Future` by marking the result as
// `undiscardable` so that we control the lifetime of the recovering registry.
return undiscardable(recovered.future())
.then(defer(self(), [this](const Nothing&) {
CHECK_SOME(this->variable);
return this->variable->get();
}));
}
Future<bool> GenericRegistrarProcess::apply(
Owned<Registrar::Operation> operation)
{
return undiscardable(recovered.future()).then(
defer(self(), &Self::_apply, std::move(operation)));
}
Future<bool> GenericRegistrarProcess::_apply(
Owned<Registrar::Operation> operation)
{
if (error.isSome()) {
return Failure(error.get());
}
operations.push_back(std::move(operation));
Future<bool> future = operations.back()->future();
if (!updating) {
update();
}
return future;
}
void GenericRegistrarProcess::update()
{
CHECK(!updating);
CHECK_NONE(error);
if (operations.empty()) {
return; // No-op.
}
updating = true;
CHECK_SOME(variable);
Registry updatedRegistry = variable->get();
foreach (Owned<Registrar::Operation>& operation, operations) {
Try<bool> operationResult = (*operation)(&updatedRegistry);
if (operationResult.isError()) {
LOG(WARNING)
<< "Failed to apply operation on resource provider manager registry: "
<< operationResult.error();
}
}
// Serialize updated registry.
CHECK_SOME(variable);
Future<Option<Variable<Registry>>> store =
state.store(variable->mutate(updatedRegistry));
store.onAny(defer(
self(),
&Self::_update,
lambda::_1,
std::move(operations)));
operations.clear();
}
void GenericRegistrarProcess::_update(
const Future<Option<Variable<Registry>>>& store,
deque<Owned<Registrar::Operation>> applied)
{
updating = false;
// Abort if the storage operation did not succeed.
if (!store.isReady() || store->isNone()) {
string message = "Failed to update registry: ";
if (store.isFailed()) {
message += store.failure();
} else if (store.isDiscarded()) {
message += "discarded";
} else {
message += "version mismatch";
}
while (!applied.empty()) {
applied.front()->fail(message);
applied.pop_front();
}
error = Error(message);
LOG(ERROR) << "Registrar aborting: " << message;
return;
}
variable = store->get();
// Remove the operations.
while (!applied.empty()) {
Owned<Registrar::Operation> operation = applied.front();
applied.pop_front();
operation->set();
}
if (!operations.empty()) {
update();
}
}
GenericRegistrar::GenericRegistrar(Owned<Storage> storage)
: process(new GenericRegistrarProcess(std::move(storage)))
{
process::spawn(process.get(), false);
}
GenericRegistrar::~GenericRegistrar()
{
process::terminate(*process);
process::wait(*process);
}
Future<Registry> GenericRegistrar::recover()
{
return dispatch(process.get(), &GenericRegistrarProcess::recover);
}
Future<bool> GenericRegistrar::apply(Owned<Operation> operation)
{
return dispatch(
process.get(),
&GenericRegistrarProcess::apply,
std::move(operation));
}
class MasterRegistrarProcess : public Process<MasterRegistrarProcess>
{
// A helper class for adapting operations on the resource provider
// registry to the master registry.
class AdaptedOperation : public master::RegistryOperation
{
public:
AdaptedOperation(Owned<Registrar::Operation> operation);
Future<registry::Registry> recover();
private:
Try<bool> perform(internal::Registry* registry, hashset<SlaveID>*) override;
Owned<Registrar::Operation> operation;
AdaptedOperation(const AdaptedOperation&) = delete;
AdaptedOperation(AdaptedOperation&&) = default;
AdaptedOperation& operator=(const AdaptedOperation&) = delete;
AdaptedOperation& operator=(AdaptedOperation&&) = default;
};
public:
explicit MasterRegistrarProcess(
master::Registrar* registrar,
Registry registry);
Future<bool> apply(Owned<Registrar::Operation> operation);
Future<registry::Registry> recover() { return registry; }
private:
master::Registrar* registrar = nullptr;
Registry registry;
};
MasterRegistrarProcess::AdaptedOperation::AdaptedOperation(
Owned<Registrar::Operation> operation)
: operation(std::move(operation)) {}
Try<bool> MasterRegistrarProcess::AdaptedOperation::perform(
internal::Registry* registry,
hashset<SlaveID>*)
{
return (*operation)(registry->mutable_resource_provider_registry());
}
MasterRegistrarProcess::MasterRegistrarProcess(
master::Registrar* _registrar,
registry::Registry _registry)
: ProcessBase(process::ID::generate("resource-provider-agent-registrar")),
registrar(_registrar),
registry(std::move(_registry)) {}
Future<bool> MasterRegistrarProcess::apply(
Owned<Registrar::Operation> operation)
{
auto adaptedOperation = Owned<master::RegistryOperation>(
new AdaptedOperation(std::move(operation)));
return registrar->apply(std::move(adaptedOperation));
}
MasterRegistrar::MasterRegistrar(
master::Registrar* registrar,
registry::Registry registry)
: process(new MasterRegistrarProcess(registrar, std::move(registry)))
{
spawn(process.get(), false);
}
MasterRegistrar::~MasterRegistrar()
{
terminate(*process);
wait(*process);
}
Future<Registry> MasterRegistrar::recover()
{
return dispatch(process.get(), &MasterRegistrarProcess::recover);
}
Future<bool> MasterRegistrar::apply(Owned<Operation> operation)
{
return dispatch(
process.get(),
&MasterRegistrarProcess::apply,
std::move(operation));
}
} // namespace resource_provider {
} // namespace mesos {