blob: 93fe7d946f2eb9eddaa04824feb4a85a1e5a68b5 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "master/detector/zookeeper.hpp"
#include <set>
#include <string>
#include <mesos/master/detector.hpp>
#include <mesos/zookeeper/detector.hpp>
#include <mesos/zookeeper/group.hpp>
#include <mesos/zookeeper/url.hpp>
#include <process/defer.hpp>
#include <process/dispatch.hpp>
#include <process/future.hpp>
#include <process/id.hpp>
#include <process/pid.hpp>
#include <process/process.hpp>
#include <stout/duration.hpp>
#include <stout/lambda.hpp>
#include <stout/protobuf.hpp>
#include "common/protobuf_utils.hpp"
#include "master/constants.hpp"
using namespace process;
using namespace zookeeper;
using std::set;
using std::string;
namespace mesos {
namespace master {
namespace detector {
const Duration MASTER_DETECTOR_ZK_SESSION_TIMEOUT = Seconds(10);
class ZooKeeperMasterDetectorProcess
: public Process<ZooKeeperMasterDetectorProcess>
{
public:
explicit ZooKeeperMasterDetectorProcess(
const zookeeper::URL& url,
const Duration& sessionTimeout);
explicit ZooKeeperMasterDetectorProcess(Owned<Group> group);
~ZooKeeperMasterDetectorProcess() override;
void initialize() override;
Future<Option<MasterInfo>> detect(const Option<MasterInfo>& previous);
private:
void discard(const Future<Option<MasterInfo>>& future);
// Invoked when the group leadership has changed.
void detected(const Future<Option<Group::Membership>>& leader);
// Invoked when we have fetched the data associated with the leader.
void fetched(
const Group::Membership& membership,
const Future<Option<string>>& data);
Owned<Group> group;
LeaderDetector detector;
// The leading Master.
Option<MasterInfo> leader;
set<Promise<Option<MasterInfo>>*> promises;
// Potential non-retryable error.
Option<Error> error;
};
ZooKeeperMasterDetectorProcess::ZooKeeperMasterDetectorProcess(
const zookeeper::URL& url,
const Duration& sessionTimeout)
: ZooKeeperMasterDetectorProcess(Owned<Group>(
new Group(url.servers,
sessionTimeout,
url.path,
url.authentication))) {}
ZooKeeperMasterDetectorProcess::ZooKeeperMasterDetectorProcess(
Owned<Group> _group)
: ProcessBase(ID::generate("zookeeper-master-detector")),
group(_group),
detector(group.get()),
leader(None()) {}
ZooKeeperMasterDetectorProcess::~ZooKeeperMasterDetectorProcess()
{
discardPromises(&promises);
}
void ZooKeeperMasterDetectorProcess::initialize()
{
detector.detect()
.onAny(defer(self(), &Self::detected, lambda::_1));
}
void ZooKeeperMasterDetectorProcess::discard(
const Future<Option<MasterInfo>>& future)
{
// Discard the promise holding this future.
discardPromises(&promises, future);
}
Future<Option<MasterInfo>> ZooKeeperMasterDetectorProcess::detect(
const Option<MasterInfo>& previous)
{
// Return immediately if the detector is no longer operational due
// to a non-retryable error.
if (error.isSome()) {
return Failure(error->message);
}
if (leader != previous) {
return leader;
}
Promise<Option<MasterInfo>>* promise = new Promise<Option<MasterInfo>>();
promise->future()
.onDiscard(defer(self(), &Self::discard, promise->future()));
promises.insert(promise);
return promise->future();
}
void ZooKeeperMasterDetectorProcess::detected(
const Future<Option<Group::Membership>>& _leader)
{
CHECK(!_leader.isDiscarded());
if (_leader.isFailed()) {
LOG(ERROR) << "Failed to detect the leader: " << _leader.failure();
// Setting this error stops the detection loop and the detector
// transitions to an erroneous state. Further calls to detect()
// will directly fail as a result.
error = Error(_leader.failure());
leader = None();
failPromises(&promises, _leader.failure());
return;
}
if (_leader->isNone()) {
leader = None();
setPromises(&promises, leader);
} else {
// Fetch the data associated with the leader.
group->data(_leader->get())
.onAny(defer(self(), &Self::fetched, _leader->get(), lambda::_1));
}
// Keep trying to detect leadership changes.
detector.detect(_leader.get())
.onAny(defer(self(), &Self::detected, lambda::_1));
}
void ZooKeeperMasterDetectorProcess::fetched(
const Group::Membership& membership,
const Future<Option<string>>& data)
{
CHECK(!data.isDiscarded());
if (data.isFailed()) {
leader = None();
failPromises(&promises, data.failure());
return;
} else if (data->isNone()) {
// Membership is gone before we can read its data.
leader = None();
setPromises(&promises, leader);
return;
}
// Parse the data based on the membership label and cache the
// leader for subsequent requests.
Option<string> label = membership.label();
if (label.isNone()) {
// If we are here it means some masters are still creating znodes
// with the old format.
UPID pid = UPID(data->get());
LOG(WARNING) << "Leading master " << pid << " has data in old format";
leader = mesos::internal::protobuf::createMasterInfo(pid);
} else if (label.isSome() &&
label.get() == internal::master::MASTER_INFO_LABEL) {
MasterInfo info;
if (!info.ParseFromString(data->get())) {
leader = None();
failPromises(&promises,
"Failed to parse data into MasterInfo");
return;
}
LOG(WARNING) << "Leading master " << info.pid()
<< " is using a Protobuf binary format when registering with "
<< "ZooKeeper (" << label.get() << "): this will be deprecated"
<< " as of Mesos 0.24 (see MESOS-2340)";
leader = info;
} else if (label.isSome() &&
label.get() == internal::master::MASTER_INFO_JSON_LABEL) {
Try<JSON::Object> object = JSON::parse<JSON::Object>(data->get());
if (object.isError()) {
leader = None();
failPromises(
&promises,
"Failed to parse data into valid JSON: " + object.error());
return;
}
Try<mesos::MasterInfo> info =
::protobuf::parse<mesos::MasterInfo>(object.get());
if (info.isError()) {
leader = None();
failPromises(
&promises,
"Failed to parse JSON into a valid MasterInfo protocol buffer: " +
info.error());
return;
}
leader = info.get();
} else {
leader = None();
failPromises(
&promises,
"Failed to parse data of unknown label '" + label.get() + "'");
return;
}
LOG(INFO) << "A new leading master (UPID="
<< UPID(leader->pid()) << ") is detected";
setPromises(&promises, leader);
}
ZooKeeperMasterDetector::ZooKeeperMasterDetector(
const zookeeper::URL& url,
const Duration& sessionTimeout)
{
process = new ZooKeeperMasterDetectorProcess(url, sessionTimeout);
spawn(process);
}
ZooKeeperMasterDetector::ZooKeeperMasterDetector(Owned<Group> group)
{
process = new ZooKeeperMasterDetectorProcess(group);
spawn(process);
}
ZooKeeperMasterDetector::~ZooKeeperMasterDetector()
{
terminate(process);
process::wait(process);
delete process;
}
Future<Option<MasterInfo>> ZooKeeperMasterDetector::detect(
const Option<MasterInfo>& previous)
{
return dispatch(process, &ZooKeeperMasterDetectorProcess::detect, previous);
}
} // namespace detector {
} // namespace master {
} // namespace mesos {