blob: 5c74471d5f61338080ad68cd9a7a6d6346682573 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <deque>
#include <string>
#include <process/defer.hpp>
#include <process/dispatch.hpp>
#include <process/future.hpp>
#include <process/help.hpp>
#include <process/http.hpp>
#include <process/id.hpp>
#include <process/owned.hpp>
#include <process/process.hpp>
#include <process/metrics/gauge.hpp>
#include <process/metrics/metrics.hpp>
#include <process/metrics/timer.hpp>
#include <stout/lambda.hpp>
#include <stout/none.hpp>
#include <stout/nothing.hpp>
#include <stout/option.hpp>
#include <stout/protobuf.hpp>
#include "common/type_utils.hpp"
#include "master/registrar.hpp"
#include "master/registry.hpp"
#include "state/protobuf.hpp"
using mesos::internal::state::protobuf::State;
using mesos::internal::state::protobuf::Variable;
using process::dispatch;
using process::spawn;
using process::terminate;
using process::wait; // Necessary on some OS's to disambiguate.
using process::DESCRIPTION;
using process::Failure;
using process::Future;
using process::HELP;
using process::Owned;
using process::Process;
using process::Promise;
using process::TLDR;
using process::USAGE;
using process::http::OK;
using process::metrics::Gauge;
using process::metrics::Timer;
using std::deque;
using std::string;
namespace mesos {
namespace internal {
namespace master {
using process::http::Response;
using process::http::Request;
class RegistrarProcess : public Process<RegistrarProcess>
{
public:
RegistrarProcess(const Flags& _flags, State* _state)
: ProcessBase(process::ID::generate("registrar")),
metrics(*this),
updating(false),
flags(_flags),
state(_state) {}
virtual ~RegistrarProcess() {}
// Registrar implementation.
Future<Registry> recover(const MasterInfo& info);
Future<bool> apply(Owned<Operation> operation);
protected:
virtual void initialize()
{
route("/registry", registryHelp(), &RegistrarProcess::registry);
}
private:
// HTTP handlers.
// /registrar(N)/registry
Future<Response> registry(const Request& request);
static string registryHelp();
// The 'Recover' operation adds the latest MasterInfo.
class Recover : public Operation
{
public:
explicit Recover(const MasterInfo& _info) : info(_info) {}
protected:
virtual Try<bool> perform(
Registry* registry,
hashset<SlaveID>* slaveIDs,
bool strict)
{
registry->mutable_master()->mutable_info()->CopyFrom(info);
return true; // Mutation.
}
private:
const MasterInfo info;
};
// Metrics.
struct Metrics
{
explicit Metrics(const RegistrarProcess& process)
: queued_operations(
"registrar/queued_operations",
defer(process, &RegistrarProcess::_queued_operations)),
registry_size_bytes(
"registrar/registry_size_bytes",
defer(process, &RegistrarProcess::_registry_size_bytes)),
state_fetch("registrar/state_fetch"),
state_store("registrar/state_store", Days(1))
{
process::metrics::add(queued_operations);
process::metrics::add(registry_size_bytes);
process::metrics::add(state_fetch);
process::metrics::add(state_store);
}
~Metrics()
{
process::metrics::remove(queued_operations);
process::metrics::remove(registry_size_bytes);
process::metrics::remove(state_fetch);
process::metrics::remove(state_store);
}
Gauge queued_operations;
Gauge registry_size_bytes;
Timer<Milliseconds> state_fetch;
Timer<Milliseconds> state_store;
} metrics;
// Gauge handlers
double _queued_operations()
{
return operations.size();
}
Future<double> _registry_size_bytes()
{
if (variable.isSome()) {
return variable.get().get().ByteSize();
}
return Failure("Not recovered yet");
}
// Continuations.
void _recover(
const MasterInfo& info,
const Future<Variable<Registry> >& recovery);
void __recover(const Future<bool>& recover);
Future<bool> _apply(Owned<Operation> operation);
// Helper for updating state (performing store).
void update();
void _update(
const Future<Option<Variable<Registry> > >& store,
deque<Owned<Operation> > operations);
// Fails all pending operations and transitions the Registrar
// into an error state in which all subsequent operations will fail.
// This ensures we don't attempt to re-acquire log leadership by
// performing more State storage operations.
void abort(const string& message);
Option<Variable<Registry> > variable;
deque<Owned<Operation> > operations;
bool updating; // Used to signify fetching (recovering) or storing.
const Flags flags;
State* state;
// Used to compose our operations with recovery.
Option<Owned<Promise<Registry> > > recovered;
// When an error is encountered from abort(), we'll fail all
// subsequent operations.
Option<Error> error;
};
// Helper for treating State operations that timeout as failures.
template <typename T>
Future<T> timeout(
const string& operation,
const Duration& duration,
Future<T> future)
{
future.discard();
return Failure(
"Failed to perform " + operation + " within " + stringify(duration));
}
// Helper for failing a deque of operations.
void fail(deque<Owned<Operation> >* operations, const string& message)
{
while (!operations->empty()) {
Owned<Operation> operation = operations->front();
operations->pop_front();
operation->fail(message);
}
}
Future<Response> RegistrarProcess::registry(const Request& request)
{
JSON::Object result;
if (variable.isSome()) {
result = JSON::Protobuf(variable.get().get());
}
return OK(result, request.query.get("jsonp"));
}
string RegistrarProcess::registryHelp()
{
return HELP(
TLDR(
"Returns the current contents of the Registry in JSON."),
USAGE(
"/registrar(1)/registry"),
DESCRIPTION(
"Example:"
"",
"```",
"{",
" \"master\":",
" {",
" \"info\":",
" {",
" \"hostname\": \"localhost\",",
" \"id\": \"20140325-235542-1740121354-5050-33357\",",
" \"ip\": 2130706433,",
" \"pid\": \"master@127.0.0.1:5050\",",
" \"port\": 5050",
" }",
" },",
"",
" \"slaves\":",
" {",
" \"slaves\":",
" [",
" {",
" \"info\":",
" {",
" \"checkpoint\": true,",
" \"hostname\": \"localhost\",",
" \"id\":",
" { ",
" \"value\": \"20140325-234618-1740121354-5050-29065-0\"",
" },",
" \"port\": 5051,",
" \"resources\":",
" [",
" {",
" \"name\": \"cpus\",",
" \"role\": \"*\",",
" \"scalar\": { \"value\": 24 },",
" \"type\": \"SCALAR\"",
" }",
" ],",
" \"webui_hostname\": \"localhost\"",
" }",
" }",
" ]",
" }",
"}",
"```"));
}
Future<Registry> RegistrarProcess::recover(const MasterInfo& info)
{
if (recovered.isNone()) {
LOG(INFO) << "Recovering registrar";
metrics.state_fetch.time(state->fetch<Registry>("registry"))
.after(flags.registry_fetch_timeout,
lambda::bind(
&timeout<Variable<Registry> >,
"fetch",
flags.registry_fetch_timeout,
lambda::_1))
.onAny(defer(self(), &Self::_recover, info, lambda::_1));
updating = true;
recovered = Owned<Promise<Registry> >(new Promise<Registry>());
}
return recovered.get()->future();
}
void RegistrarProcess::_recover(
const MasterInfo& info,
const Future<Variable<Registry> >& recovery)
{
updating = false;
CHECK(!recovery.isPending());
if (!recovery.isReady()) {
recovered.get()->fail("Failed to recover registrar: " +
(recovery.isFailed() ? recovery.failure() : "discarded"));
} else {
// Save the registry.
variable = recovery.get();
LOG(INFO) << "Successfully fetched the registry "
<< "(" << Bytes(variable.get().get().ByteSize()) << ")";
// Perform the Recover operation to add the new MasterInfo.
Owned<Operation> operation(new Recover(info));
operations.push_back(operation);
operation->future()
.onAny(defer(self(), &Self::__recover, lambda::_1));
update();
}
}
void RegistrarProcess::__recover(const Future<bool>& recover)
{
CHECK(!recover.isPending());
if (!recover.isReady()) {
recovered.get()->fail("Failed to recover registrar: "
"Failed to persist MasterInfo: " +
(recover.isFailed() ? recover.failure() : "discarded"));
} else if (!recover.get()) {
recovered.get()->fail("Failed to recover registrar: "
"Failed to persist MasterInfo: version mismatch");
} else {
LOG(INFO) << "Successfully recovered registrar";
// At this point _update() has updated 'variable' to contain
// the Registry with the latest MasterInfo.
// Set the promise and un-gate any pending operations.
CHECK_SOME(variable);
recovered.get()->set(variable.get().get());
}
}
Future<bool> RegistrarProcess::apply(Owned<Operation> operation)
{
if (recovered.isNone()) {
return Failure("Attempted to apply the operation before recovering");
}
return recovered.get()->future()
.then(defer(self(), &Self::_apply, operation));
}
Future<bool> RegistrarProcess::_apply(Owned<Operation> operation)
{
if (error.isSome()) {
return Failure(error.get());
}
CHECK_SOME(variable);
operations.push_back(operation);
Future<bool> future = operation->future();
if (!updating) {
update();
}
return future;
}
void RegistrarProcess::update()
{
if (operations.empty()) {
return; // No-op.
}
CHECK(!updating);
CHECK(error.isNone());
updating = true;
LOG(INFO) << "Attempting to update the 'registry'";
CHECK_SOME(variable);
// Create a snapshot of the current registry.
Registry registry = variable.get().get();
// Create the 'slaveIDs' accumulator.
hashset<SlaveID> slaveIDs;
foreach (const Registry::Slave& slave, registry.slaves().slaves()) {
slaveIDs.insert(slave.info().id());
}
foreach (Owned<Operation> operation, operations) {
// No need to process the result of the operation.
(*operation)(&registry, &slaveIDs, flags.registry_strict);
}
// Perform the store, and time the operation.
metrics.state_store.time(state->store(variable.get().mutate(registry)))
.after(flags.registry_store_timeout,
lambda::bind(
&timeout<Option<Variable<Registry> > >,
"store",
flags.registry_store_timeout,
lambda::_1))
.onAny(defer(self(), &Self::_update, lambda::_1, operations));
// Clear the operations, _update will transition the Promises!
operations.clear();
}
void RegistrarProcess::_update(
const Future<Option<Variable<Registry> > >& store,
deque<Owned<Operation> > applied)
{
updating = false;
// Abort if the storage operation did not succeed.
if (!store.isReady() || store.get().isNone()) {
string message = "Failed to update 'registry': ";
if (store.isFailed()) {
message += store.failure();
} else if (store.isDiscarded()) {
message += "discarded";
} else {
message += "version mismatch";
}
fail(&applied, message);
abort(message);
return;
}
LOG(INFO) << "Successfully updated 'registry'";
variable = store.get().get();
// Remove the operations.
while (!applied.empty()) {
Owned<Operation> operation = applied.front();
applied.pop_front();
operation->set();
}
if (!operations.empty()) {
update();
}
}
void RegistrarProcess::abort(const string& message)
{
error = Error(message);
LOG(ERROR) << "Registrar aborting: " << message;
fail(&operations, message);
}
Registrar::Registrar(const Flags& flags, State* state)
{
process = new RegistrarProcess(flags, state);
spawn(process);
}
Registrar::~Registrar()
{
terminate(process);
wait(process);
delete process;
}
Future<Registry> Registrar::recover(const MasterInfo& info)
{
return dispatch(process, &RegistrarProcess::recover, info);
}
Future<bool> Registrar::apply(Owned<Operation> operation)
{
return dispatch(process, &RegistrarProcess::apply, operation);
}
} // namespace master {
} // namespace internal {
} // namespace mesos {