blob: c1bf82b621d6b46afe001acafe9ee53336726406 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <process/defer.hpp>
#include <process/id.hpp>
#include <process/process.hpp>
#include <stout/check.hpp>
#include <stout/lambda.hpp>
#include "master/constants.hpp"
#include "master/contender.hpp"
#include "master/master.hpp"
#include "zookeeper/contender.hpp"
#include "zookeeper/detector.hpp"
#include "zookeeper/group.hpp"
#include "zookeeper/url.hpp"
using std::string;
using namespace process;
using namespace zookeeper;
namespace mesos {
namespace internal {
using namespace master;
const Duration MASTER_CONTENDER_ZK_SESSION_TIMEOUT = Seconds(10);
class ZooKeeperMasterContenderProcess
: public Process<ZooKeeperMasterContenderProcess>
{
public:
explicit ZooKeeperMasterContenderProcess(const zookeeper::URL& url);
explicit ZooKeeperMasterContenderProcess(Owned<zookeeper::Group> group);
virtual ~ZooKeeperMasterContenderProcess();
// Explicitely use 'initialize' since we're overloading below.
using process::ProcessBase::initialize;
void initialize(const MasterInfo& masterInfo);
// MasterContender implementation.
virtual Future<Future<Nothing> > contend();
private:
Owned<zookeeper::Group> group;
LeaderContender* contender;
// The master this contender contends on behalf of.
Option<MasterInfo> masterInfo;
Option<Future<Future<Nothing> > > candidacy;
};
Try<MasterContender*> MasterContender::create(const string& zk)
{
if (zk == "") {
return new StandaloneMasterContender();
} else if (strings::startsWith(zk, "zk://")) {
Try<zookeeper::URL> url = URL::parse(zk);
if (url.isError()) {
return Error(url.error());
}
if (url.get().path == "/") {
return Error(
"Expecting a (chroot) path for ZooKeeper ('/' is not supported)");
}
return new ZooKeeperMasterContender(url.get());
} else if (strings::startsWith(zk, "file://")) {
const string& path = zk.substr(7);
const Try<string> read = os::read(path);
if (read.isError()) {
return Error("Failed to read from file at '" + path + "'");
}
return create(strings::trim(read.get()));
}
return Error("Failed to parse '" + zk + "'");
}
MasterContender::~MasterContender() {}
StandaloneMasterContender::~StandaloneMasterContender()
{
if (promise != NULL) {
promise->set(Nothing()); // Leadership lost.
delete promise;
}
}
void StandaloneMasterContender::initialize(const MasterInfo& masterInfo)
{
// We don't really need to store the master in this basic
// implementation so we just restore an 'initialized' flag to make
// sure it is called.
initialized = true;
}
Future<Future<Nothing> > StandaloneMasterContender::contend()
{
if (!initialized) {
return Failure("Initialize the contender first");
}
if (promise != NULL) {
LOG(INFO) << "Withdrawing the previous membership before recontending";
promise->set(Nothing());
delete promise;
}
// Directly return a future that is always pending because it
// represents a membership/leadership that is not going to be lost
// until we 'withdraw'.
promise = new Promise<Nothing>();
return promise->future();
}
ZooKeeperMasterContender::ZooKeeperMasterContender(const URL& url)
{
process = new ZooKeeperMasterContenderProcess(url);
spawn(process);
}
ZooKeeperMasterContender::ZooKeeperMasterContender(Owned<Group> group)
{
process = new ZooKeeperMasterContenderProcess(group);
spawn(process);
}
ZooKeeperMasterContender::~ZooKeeperMasterContender()
{
terminate(process);
process::wait(process);
delete process;
}
void ZooKeeperMasterContender::initialize(const MasterInfo& masterInfo)
{
process->initialize(masterInfo);
}
Future<Future<Nothing> > ZooKeeperMasterContender::contend()
{
return dispatch(process, &ZooKeeperMasterContenderProcess::contend);
}
ZooKeeperMasterContenderProcess::ZooKeeperMasterContenderProcess(
const URL& url)
: ProcessBase(ID::generate("zookeeper-master-contender")),
group(new Group(url, MASTER_CONTENDER_ZK_SESSION_TIMEOUT)),
contender(NULL) {}
ZooKeeperMasterContenderProcess::ZooKeeperMasterContenderProcess(
Owned<Group> _group)
: ProcessBase(ID::generate("zookeeper-master-contender")),
group(_group),
contender(NULL) {}
ZooKeeperMasterContenderProcess::~ZooKeeperMasterContenderProcess()
{
delete contender;
}
void ZooKeeperMasterContenderProcess::initialize(const MasterInfo& _masterInfo)
{
masterInfo = _masterInfo;
}
Future<Future<Nothing> > ZooKeeperMasterContenderProcess::contend()
{
if (masterInfo.isNone()) {
return Failure("Initialize the contender first");
}
// Should not recontend if the last election is still ongoing.
if (candidacy.isSome() && candidacy.get().isPending()) {
return candidacy.get();
}
if (contender != NULL) {
LOG(INFO) << "Withdrawing the previous membership before recontending";
delete contender;
}
// Serialize the MasterInfo to string.
string data;
if (!masterInfo.get().SerializeToString(&data)) {
return Failure("Failed to serialize data to MasterInfo");
}
contender = new LeaderContender(group.get(), data, master::MASTER_INFO_LABEL);
candidacy = contender->contend();
return candidacy.get();
}
} // namespace internal {
} // namespace mesos {