blob: d62ef8376811ebdf5d5c88f1e6595786f05459c1 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "master/contender/zookeeper.hpp"
#include <mesos/master/contender.hpp>
#include <mesos/zookeeper/contender.hpp>
#include <mesos/zookeeper/group.hpp>
#include <mesos/zookeeper/url.hpp>
#include <process/dispatch.hpp>
#include <process/id.hpp>
#include <process/process.hpp>
#include <stout/protobuf.hpp>
#include "master/constants.hpp"
using std::string;
using namespace process;
using namespace zookeeper;
namespace mesos {
namespace master {
namespace contender {
const Duration MASTER_CONTENDER_ZK_SESSION_TIMEOUT = Seconds(10);
class ZooKeeperMasterContenderProcess
: public Process<ZooKeeperMasterContenderProcess>
{
public:
explicit ZooKeeperMasterContenderProcess(
const zookeeper::URL& url,
const Duration& sessionTimeout);
explicit ZooKeeperMasterContenderProcess(Owned<zookeeper::Group> group);
~ZooKeeperMasterContenderProcess() override;
// Explicitly use 'initialize' since we're overloading below.
using process::ProcessBase::initialize;
void initialize(const MasterInfo& masterInfo);
// MasterContender implementation.
virtual Future<Future<Nothing>> contend();
private:
Owned<zookeeper::Group> group;
LeaderContender* contender;
// The master this contender contends on behalf of.
Option<MasterInfo> masterInfo;
Option<Future<Future<Nothing>>> candidacy;
};
ZooKeeperMasterContender::ZooKeeperMasterContender(
const zookeeper::URL& url,
const Duration& sessionTimeout)
{
process = new ZooKeeperMasterContenderProcess(url, sessionTimeout);
spawn(process);
}
ZooKeeperMasterContender::ZooKeeperMasterContender(Owned<Group> group)
{
process = new ZooKeeperMasterContenderProcess(group);
spawn(process);
}
ZooKeeperMasterContender::~ZooKeeperMasterContender()
{
terminate(process);
process::wait(process);
delete process;
}
void ZooKeeperMasterContender::initialize(const MasterInfo& masterInfo)
{
process->initialize(masterInfo);
}
Future<Future<Nothing>> ZooKeeperMasterContender::contend()
{
return dispatch(process, &ZooKeeperMasterContenderProcess::contend);
}
ZooKeeperMasterContenderProcess::ZooKeeperMasterContenderProcess(
const zookeeper::URL& url,
const Duration& sessionTimeout)
: ZooKeeperMasterContenderProcess(Owned<Group>(
new Group(url, sessionTimeout))) {}
ZooKeeperMasterContenderProcess::ZooKeeperMasterContenderProcess(
Owned<Group> _group)
: ProcessBase(ID::generate("zookeeper-master-contender")),
group(_group),
contender(nullptr) {}
ZooKeeperMasterContenderProcess::~ZooKeeperMasterContenderProcess()
{
delete contender;
}
void ZooKeeperMasterContenderProcess::initialize(const MasterInfo& _masterInfo)
{
masterInfo = _masterInfo;
}
Future<Future<Nothing>> ZooKeeperMasterContenderProcess::contend()
{
if (masterInfo.isNone()) {
return Failure("Initialize the contender first");
}
// Should not recontend if the last election is still ongoing.
if (candidacy.isSome() && candidacy->isPending()) {
return candidacy.get();
}
if (contender != nullptr) {
LOG(INFO) << "Withdrawing the previous membership before recontending";
delete contender;
}
// Serialize the MasterInfo to JSON.
JSON::Object json = JSON::protobuf(masterInfo.get());
contender = new LeaderContender(
group.get(),
stringify(json),
mesos::internal::master::MASTER_INFO_JSON_LABEL);
candidacy = contender->contend();
return candidacy.get();
}
} // namespace contender {
} // namespace master {
} // namespace mesos {