blob: 9274435802d6292b183be48f42b43999476e016e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <set>
#include <string>
#include <process/defer.hpp>
#include <process/dispatch.hpp>
#include <process/future.hpp>
#include <process/id.hpp>
#include <process/logging.hpp>
#include <process/pid.hpp>
#include <process/process.hpp>
#include <stout/duration.hpp>
#include <stout/foreach.hpp>
#include <stout/lambda.hpp>
#include <stout/protobuf.hpp>
#include "common/protobuf_utils.hpp"
#include "master/constants.hpp"
#include "master/detector.hpp"
#include "master/master.hpp"
#include "messages/messages.hpp"
#include "zookeeper/detector.hpp"
#include "zookeeper/group.hpp"
#include "zookeeper/url.hpp"
using namespace process;
using namespace zookeeper;
using std::set;
using std::string;
namespace mesos {
namespace internal {
const Duration MASTER_DETECTOR_ZK_SESSION_TIMEOUT = Seconds(10);
// TODO(bmahler): Consider moving these kinds of helpers into
// libprocess or a common header within mesos.
namespace promises {
// Helper for setting a set of Promises.
template <typename T>
void set(std::set<Promise<T>* >* promises, const T& t)
{
foreach (Promise<T>* promise, *promises) {
promise->set(t);
delete promise;
}
promises->clear();
}
// Helper for failing a set of Promises.
template <typename T>
void fail(std::set<Promise<T>* >* promises, const string& failure)
{
foreach (Promise<Option<MasterInfo> >* promise, *promises) {
promise->fail(failure);
delete promise;
}
promises->clear();
}
// Helper for discarding a set of Promises.
template <typename T>
void discard(std::set<Promise<T>* >* promises)
{
foreach (Promise<T>* promise, *promises) {
promise->discard();
delete promise;
}
promises->clear();
}
// Helper for discarding an individual promise in the set.
template <typename T>
void discard(std::set<Promise<T>* >* promises, const Future<T>& future)
{
foreach (Promise<T>* promise, *promises) {
if (promise->future() == future) {
promise->discard();
promises->erase(promise);
delete promise;
return;
}
}
}
} // namespace promises {
class StandaloneMasterDetectorProcess
: public Process<StandaloneMasterDetectorProcess>
{
public:
StandaloneMasterDetectorProcess()
: ProcessBase(ID::generate("standalone-master-detector")) {}
explicit StandaloneMasterDetectorProcess(const MasterInfo& _leader)
: ProcessBase(ID::generate("standalone-master-detector")),
leader(_leader) {}
~StandaloneMasterDetectorProcess()
{
promises::discard(&promises);
}
void appoint(const Option<MasterInfo>& leader_)
{
leader = leader_;
promises::set(&promises, leader);
}
Future<Option<MasterInfo> > detect(
const Option<MasterInfo>& previous = None())
{
if (leader != previous) {
return leader;
}
Promise<Option<MasterInfo> >* promise = new Promise<Option<MasterInfo> >();
promise->future()
.onDiscard(defer(self(), &Self::discard, promise->future()));
promises.insert(promise);
return promise->future();
}
private:
void discard(const Future<Option<MasterInfo> >& future)
{
// Discard the promise holding this future.
promises::discard(&promises, future);
}
Option<MasterInfo> leader; // The appointed master.
set<Promise<Option<MasterInfo> >*> promises;
};
class ZooKeeperMasterDetectorProcess
: public Process<ZooKeeperMasterDetectorProcess>
{
public:
explicit ZooKeeperMasterDetectorProcess(const zookeeper::URL& url);
explicit ZooKeeperMasterDetectorProcess(Owned<Group> group);
~ZooKeeperMasterDetectorProcess();
virtual void initialize();
Future<Option<MasterInfo> > detect(const Option<MasterInfo>& previous);
private:
void discard(const Future<Option<MasterInfo> >& future);
// Invoked when the group leadership has changed.
void detected(const Future<Option<Group::Membership> >& leader);
// Invoked when we have fetched the data associated with the leader.
void fetched(
const Group::Membership& membership,
const Future<Option<string> >& data);
Owned<Group> group;
LeaderDetector detector;
// The leading Master.
Option<MasterInfo> leader;
set<Promise<Option<MasterInfo> >*> promises;
// Potential non-retryable error.
Option<Error> error;
};
Try<MasterDetector*> MasterDetector::create(const Option<string>& _mechanism)
{
if (_mechanism.isNone()) {
return new StandaloneMasterDetector();
}
string mechanism = _mechanism.get();
if (strings::startsWith(mechanism, "zk://")) {
Try<zookeeper::URL> url = zookeeper::URL::parse(mechanism);
if (url.isError()) {
return Error(url.error());
}
if (url.get().path == "/") {
return Error(
"Expecting a (chroot) path for ZooKeeper ('/' is not supported)");
}
return new ZooKeeperMasterDetector(url.get());
} else if (strings::startsWith(mechanism, "file://")) {
// Load the configuration out of a file. While Mesos and related
// programs always use <stout/flags> to process the command line
// arguments (and therefore file://) this entrypoint is exposed by
// libmesos, with frameworks currently calling it and expecting it
// to do the argument parsing for them which roughly matches the
// argument parsing Mesos will do.
// TODO(cmaloney): Rework the libmesos exposed APIs to expose
// A "flags" endpoint where the framework can pass the command
// line arguments and they will be parsed by <stout/flags> and the
// needed flags extracted, and then change this interface to
// require final values from the flags. This means that a
// framework doesn't need to know how the flags are passed to
// match mesos' command line arguments if it wants, but if it
// needs to inspect/manipulate arguments, it can.
LOG(WARNING) << "Specifying master detection mechanism / ZooKeeper URL to "
"be read out of a file via 'file://' is deprecated inside "
"Mesos and will be removed in a future release.";
const string& path = mechanism.substr(7);
const Try<string> read = os::read(path);
if (read.isError()) {
return Error("Failed to read from file at '" + path + "'");
}
return create(strings::trim(read.get()));
}
CHECK(!strings::startsWith(mechanism, "file://"));
// Okay, try and parse what we got as a PID.
UPID pid = mechanism.find("master@") == 0
? UPID(mechanism)
: UPID("master@" + mechanism);
if (!pid) {
return Error("Failed to parse '" + mechanism + "'");
}
return new StandaloneMasterDetector(protobuf::createMasterInfo(pid));
}
MasterDetector::~MasterDetector() {}
StandaloneMasterDetector::StandaloneMasterDetector()
{
process = new StandaloneMasterDetectorProcess();
spawn(process);
}
StandaloneMasterDetector::StandaloneMasterDetector(const MasterInfo& leader)
{
process = new StandaloneMasterDetectorProcess(leader);
spawn(process);
}
StandaloneMasterDetector::StandaloneMasterDetector(const UPID& leader)
{
process =
new StandaloneMasterDetectorProcess(protobuf::createMasterInfo(leader));
spawn(process);
}
StandaloneMasterDetector::~StandaloneMasterDetector()
{
terminate(process);
process::wait(process);
delete process;
}
void StandaloneMasterDetector::appoint(const Option<MasterInfo>& leader)
{
dispatch(process, &StandaloneMasterDetectorProcess::appoint, leader);
}
void StandaloneMasterDetector::appoint(const UPID& leader)
{
dispatch(process,
&StandaloneMasterDetectorProcess::appoint,
protobuf::createMasterInfo(leader));
}
Future<Option<MasterInfo> > StandaloneMasterDetector::detect(
const Option<MasterInfo>& previous)
{
return dispatch(process, &StandaloneMasterDetectorProcess::detect, previous);
}
// TODO(benh): Get ZooKeeper timeout from configuration.
ZooKeeperMasterDetectorProcess::ZooKeeperMasterDetectorProcess(
const zookeeper::URL& url)
: ZooKeeperMasterDetectorProcess(Owned<Group>(
new Group(url.servers,
MASTER_DETECTOR_ZK_SESSION_TIMEOUT,
url.path,
url.authentication))) {}
ZooKeeperMasterDetectorProcess::ZooKeeperMasterDetectorProcess(
Owned<Group> _group)
: ProcessBase(ID::generate("zookeeper-master-detector")),
group(_group),
detector(group.get()),
leader(None()) {}
ZooKeeperMasterDetectorProcess::~ZooKeeperMasterDetectorProcess()
{
promises::discard(&promises);
}
void ZooKeeperMasterDetectorProcess::initialize()
{
detector.detect()
.onAny(defer(self(), &Self::detected, lambda::_1));
}
void ZooKeeperMasterDetectorProcess::discard(
const Future<Option<MasterInfo> >& future)
{
// Discard the promise holding this future.
promises::discard(&promises, future);
}
Future<Option<MasterInfo> > ZooKeeperMasterDetectorProcess::detect(
const Option<MasterInfo>& previous)
{
// Return immediately if the detector is no longer operational due
// to a non-retryable error.
if (error.isSome()) {
return Failure(error.get().message);
}
if (leader != previous) {
return leader;
}
Promise<Option<MasterInfo> >* promise = new Promise<Option<MasterInfo> >();
promise->future()
.onDiscard(defer(self(), &Self::discard, promise->future()));
promises.insert(promise);
return promise->future();
}
void ZooKeeperMasterDetectorProcess::detected(
const Future<Option<Group::Membership> >& _leader)
{
CHECK(!_leader.isDiscarded());
if (_leader.isFailed()) {
LOG(ERROR) << "Failed to detect the leader: " << _leader.failure();
// Setting this error stops the detection loop and the detector
// transitions to an erroneous state. Further calls to detect()
// will directly fail as a result.
error = Error(_leader.failure());
leader = None();
promises::fail(&promises, _leader.failure());
return;
}
if (_leader.get().isNone()) {
leader = None();
promises::set(&promises, leader);
} else {
// Fetch the data associated with the leader.
group->data(_leader.get().get())
.onAny(defer(self(), &Self::fetched, _leader.get().get(), lambda::_1));
}
// Keep trying to detect leadership changes.
detector.detect(_leader.get())
.onAny(defer(self(), &Self::detected, lambda::_1));
}
void ZooKeeperMasterDetectorProcess::fetched(
const Group::Membership& membership,
const Future<Option<string> >& data)
{
CHECK(!data.isDiscarded());
if (data.isFailed()) {
leader = None();
promises::fail(&promises, data.failure());
return;
} else if (data.get().isNone()) {
// Membership is gone before we can read its data.
leader = None();
promises::set(&promises, leader);
return;
}
// Parse the data based on the membership label and cache the
// leader for subsequent requests.
Option<string> label = membership.label();
if (label.isNone()) {
// If we are here it means some masters are still creating znodes
// with the old format.
UPID pid = UPID(data.get().get());
LOG(WARNING) << "Leading master " << pid << " has data in old format";
leader = protobuf::createMasterInfo(pid);
} else if (label.isSome() && label.get() == master::MASTER_INFO_LABEL) {
MasterInfo info;
if (!info.ParseFromString(data.get().get())) {
leader = None();
promises::fail(&promises, "Failed to parse data into MasterInfo");
return;
}
LOG(WARNING) << "Leading master " << info.pid()
<< " is using a Protobuf binary format when registering with "
<< "ZooKeeper (" << label.get() << "): this will be deprecated"
<< " as of Mesos 0.24 (see MESOS-2340)";
leader = info;
} else if (label.isSome() && label.get() == master::MASTER_INFO_JSON_LABEL) {
Try<JSON::Object> object = JSON::parse<JSON::Object>(data.get().get());
if (object.isError()) {
leader = None();
promises::fail(
&promises,
"Failed to parse data into valid JSON: " + object.error());
return;
}
Try<mesos::MasterInfo> info =
::protobuf::parse<mesos::MasterInfo>(object.get());
if (info.isError()) {
leader = None();
promises::fail(
&promises,
"Failed to parse JSON into a valid MasterInfo protocol buffer: " +
info.error());
return;
}
leader = info.get();
} else {
leader = None();
promises::fail(
&promises,
"Failed to parse data of unknown label '" + label.get() + "'");
return;
}
LOG(INFO) << "A new leading master (UPID="
<< UPID(leader.get().pid()) << ") is detected";
promises::set(&promises, leader);
}
ZooKeeperMasterDetector::ZooKeeperMasterDetector(const zookeeper::URL& url)
{
process = new ZooKeeperMasterDetectorProcess(url);
spawn(process);
}
ZooKeeperMasterDetector::ZooKeeperMasterDetector(Owned<Group> group)
{
process = new ZooKeeperMasterDetectorProcess(group);
spawn(process);
}
ZooKeeperMasterDetector::~ZooKeeperMasterDetector()
{
terminate(process);
process::wait(process);
delete process;
}
Future<Option<MasterInfo> > ZooKeeperMasterDetector::detect(
const Option<MasterInfo>& previous)
{
return dispatch(process, &ZooKeeperMasterDetectorProcess::detect, previous);
}
} // namespace internal {
} // namespace mesos {