blob: 6436b8ee7e1ab6451a6b999a1cfbb2f79190e6ca [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <set>
#include <string>
#include <process/defer.hpp>
#include <process/dispatch.hpp>
#include <process/future.hpp>
#include <process/id.hpp>
#include <process/logging.hpp>
#include <process/pid.hpp>
#include <process/process.hpp>
#include <stout/duration.hpp>
#include <stout/foreach.hpp>
#include <stout/lambda.hpp>
#include "common/protobuf_utils.hpp"
#include "master/constants.hpp"
#include "master/detector.hpp"
#include "master/master.hpp"
#include "messages/messages.hpp"
#include "zookeeper/detector.hpp"
#include "zookeeper/group.hpp"
#include "zookeeper/url.hpp"
using namespace process;
using namespace zookeeper;
using std::set;
using std::string;
namespace mesos {
namespace internal {
const Duration MASTER_DETECTOR_ZK_SESSION_TIMEOUT = Seconds(10);
// TODO(bmahler): Consider moving these kinds of helpers into
// libprocess or a common header within mesos.
namespace promises {
// Helper for setting a set of Promises.
template <typename T>
void set(std::set<Promise<T>* >* promises, const T& t)
{
foreach (Promise<T>* promise, *promises) {
promise->set(t);
delete promise;
}
promises->clear();
}
// Helper for failing a set of Promises.
template <typename T>
void fail(std::set<Promise<T>* >* promises, const string& failure)
{
foreach (Promise<Option<MasterInfo> >* promise, *promises) {
promise->fail(failure);
delete promise;
}
promises->clear();
}
// Helper for discarding a set of Promises.
template <typename T>
void discard(std::set<Promise<T>* >* promises)
{
foreach (Promise<T>* promise, *promises) {
promise->discard();
delete promise;
}
promises->clear();
}
// Helper for discarding an individual promise in the set.
template <typename T>
void discard(std::set<Promise<T>* >* promises, const Future<T>& future)
{
foreach (Promise<T>* promise, *promises) {
if (promise->future() == future) {
promise->discard();
promises->erase(promise);
delete promise;
return;
}
}
}
} // namespace promises {
class StandaloneMasterDetectorProcess
: public Process<StandaloneMasterDetectorProcess>
{
public:
StandaloneMasterDetectorProcess()
: ProcessBase(ID::generate("standalone-master-detector")) {}
explicit StandaloneMasterDetectorProcess(const MasterInfo& _leader)
: ProcessBase(ID::generate("standalone-master-detector")),
leader(_leader) {}
~StandaloneMasterDetectorProcess()
{
promises::discard(&promises);
}
void appoint(const Option<MasterInfo>& leader_)
{
leader = leader_;
promises::set(&promises, leader);
}
Future<Option<MasterInfo> > detect(
const Option<MasterInfo>& previous = None())
{
if (leader != previous) {
return leader;
}
Promise<Option<MasterInfo> >* promise = new Promise<Option<MasterInfo> >();
promise->future()
.onDiscard(defer(self(), &Self::discard, promise->future()));
promises.insert(promise);
return promise->future();
}
private:
void discard(const Future<Option<MasterInfo> >& future)
{
// Discard the promise holding this future.
promises::discard(&promises, future);
}
Option<MasterInfo> leader; // The appointed master.
set<Promise<Option<MasterInfo> >*> promises;
};
class ZooKeeperMasterDetectorProcess
: public Process<ZooKeeperMasterDetectorProcess>
{
public:
explicit ZooKeeperMasterDetectorProcess(const URL& url);
explicit ZooKeeperMasterDetectorProcess(Owned<Group> group);
~ZooKeeperMasterDetectorProcess();
virtual void initialize();
Future<Option<MasterInfo> > detect(const Option<MasterInfo>& previous);
private:
void discard(const Future<Option<MasterInfo> >& future);
// Invoked when the group leadership has changed.
void detected(const Future<Option<Group::Membership> >& leader);
// Invoked when we have fetched the data associated with the leader.
void fetched(const Group::Membership& membership, const Future<string>& data);
Owned<Group> group;
LeaderDetector detector;
// The leading Master.
Option<MasterInfo> leader;
set<Promise<Option<MasterInfo> >*> promises;
// Potential non-retryable error.
Option<Error> error;
};
Try<MasterDetector*> MasterDetector::create(const string& master)
{
if (master == "") {
return new StandaloneMasterDetector();
} else if (master.find("zk://") == 0) {
Try<URL> url = URL::parse(master);
if (url.isError()) {
return Error(url.error());
}
if (url.get().path == "/") {
return Error(
"Expecting a (chroot) path for ZooKeeper ('/' is not supported)");
}
return new ZooKeeperMasterDetector(url.get());
} else if (master.find("file://") == 0) {
const string& path = master.substr(7);
const Try<string> read = os::read(path);
if (read.isError()) {
return Error("Failed to read from file at '" + path + "'");
}
return create(strings::trim(read.get()));
}
// Okay, try and parse what we got as a PID.
UPID pid = master.find("master@") == 0
? UPID(master)
: UPID("master@" + master);
if (!pid) {
return Error("Failed to parse '" + master + "'");
}
return new StandaloneMasterDetector(protobuf::createMasterInfo(pid));
}
MasterDetector::~MasterDetector() {}
StandaloneMasterDetector::StandaloneMasterDetector()
{
process = new StandaloneMasterDetectorProcess();
spawn(process);
}
StandaloneMasterDetector::StandaloneMasterDetector(const MasterInfo& leader)
{
process = new StandaloneMasterDetectorProcess(leader);
spawn(process);
}
StandaloneMasterDetector::StandaloneMasterDetector(const UPID& leader)
{
process =
new StandaloneMasterDetectorProcess(protobuf::createMasterInfo(leader));
spawn(process);
}
StandaloneMasterDetector::~StandaloneMasterDetector()
{
terminate(process);
process::wait(process);
delete process;
}
void StandaloneMasterDetector::appoint(const Option<MasterInfo>& leader)
{
dispatch(process, &StandaloneMasterDetectorProcess::appoint, leader);
}
void StandaloneMasterDetector::appoint(const UPID& leader)
{
dispatch(process,
&StandaloneMasterDetectorProcess::appoint,
protobuf::createMasterInfo(leader));
}
Future<Option<MasterInfo> > StandaloneMasterDetector::detect(
const Option<MasterInfo>& previous)
{
return dispatch(process, &StandaloneMasterDetectorProcess::detect, previous);
}
// TODO(benh): Get ZooKeeper timeout from configuration.
// TODO(xujyan): Use peer constructor after switching to C++ 11.
ZooKeeperMasterDetectorProcess::ZooKeeperMasterDetectorProcess(
const URL& url)
: ProcessBase(ID::generate("zookeeper-master-detector")),
group(new Group(url.servers,
MASTER_DETECTOR_ZK_SESSION_TIMEOUT,
url.path,
url.authentication)),
detector(group.get()),
leader(None()) {}
ZooKeeperMasterDetectorProcess::ZooKeeperMasterDetectorProcess(
Owned<Group> _group)
: ProcessBase(ID::generate("zookeeper-master-detector")),
group(_group),
detector(group.get()),
leader(None()) {}
ZooKeeperMasterDetectorProcess::~ZooKeeperMasterDetectorProcess()
{
promises::discard(&promises);
}
void ZooKeeperMasterDetectorProcess::initialize()
{
detector.detect()
.onAny(defer(self(), &Self::detected, lambda::_1));
}
void ZooKeeperMasterDetectorProcess::discard(
const Future<Option<MasterInfo> >& future)
{
// Discard the promise holding this future.
promises::discard(&promises, future);
}
Future<Option<MasterInfo> > ZooKeeperMasterDetectorProcess::detect(
const Option<MasterInfo>& previous)
{
// Return immediately if the detector is no longer operational due
// to a non-retryable error.
if (error.isSome()) {
return Failure(error.get().message);
}
if (leader != previous) {
return leader;
}
Promise<Option<MasterInfo> >* promise = new Promise<Option<MasterInfo> >();
promise->future()
.onDiscard(defer(self(), &Self::discard, promise->future()));
promises.insert(promise);
return promise->future();
}
void ZooKeeperMasterDetectorProcess::detected(
const Future<Option<Group::Membership> >& _leader)
{
CHECK(!_leader.isDiscarded());
if (_leader.isFailed()) {
LOG(ERROR) << "Failed to detect the leader: " << _leader.failure();
// Setting this error stops the detection loop and the detector
// transitions to an erroneous state. Further calls to detect()
// will directly fail as a result.
error = Error(_leader.failure());
leader = None();
promises::fail(&promises, _leader.failure());
return;
}
if (_leader.get().isNone()) {
leader = None();
promises::set(&promises, leader);
} else {
// Fetch the data associated with the leader.
group->data(_leader.get().get())
.onAny(defer(self(), &Self::fetched, _leader.get().get(), lambda::_1));
}
// Keep trying to detect leadership changes.
detector.detect(_leader.get())
.onAny(defer(self(), &Self::detected, lambda::_1));
}
void ZooKeeperMasterDetectorProcess::fetched(
const Group::Membership& membership,
const Future<string>& data)
{
CHECK(!data.isDiscarded());
if (data.isFailed()) {
leader = None();
promises::fail(&promises, data.failure());
return;
}
// Parse the data based on the membership label and cache the
// leader for subsequent requests.
Option<string> label = membership.label();
if (label.isNone()) {
// If we are here it means some masters are still creating znodes
// with the old format.
UPID pid = UPID(data.get());
LOG(WARNING) << "Leading master " << pid << " has data in old format";
leader = protobuf::createMasterInfo(pid);
} else if (label.isSome() && label.get() == master::MASTER_INFO_LABEL) {
MasterInfo info;
if (!info.ParseFromString(data.get())) {
leader = None();
promises::fail(&promises, "Failed to parse data into MasterInfo");
return;
}
leader = info;
} else {
leader = None();
promises::fail(
&promises,
"Failed to parse data of unknown label '" + label.get() + "'");
return;
}
LOG(INFO) << "A new leading master (UPID="
<< UPID(leader.get().pid()) << ") is detected";
promises::set(&promises, leader);
}
ZooKeeperMasterDetector::ZooKeeperMasterDetector(const URL& url)
{
process = new ZooKeeperMasterDetectorProcess(url);
spawn(process);
}
ZooKeeperMasterDetector::ZooKeeperMasterDetector(Owned<Group> group)
{
process = new ZooKeeperMasterDetectorProcess(group);
spawn(process);
}
ZooKeeperMasterDetector::~ZooKeeperMasterDetector()
{
terminate(process);
process::wait(process);
delete process;
}
Future<Option<MasterInfo> > ZooKeeperMasterDetector::detect(
const Option<MasterInfo>& previous)
{
return dispatch(process, &ZooKeeperMasterDetectorProcess::detect, previous);
}
} // namespace internal {
} // namespace mesos {