blob: 6ba08d8f95f7a746e330d1f43aa1e8dfcc975638 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License
#include <algorithm>
#include <queue>
#include <utility>
#include <vector>
#include <mesos/zookeeper/group.hpp>
#include <mesos/zookeeper/watcher.hpp>
#include <mesos/zookeeper/zookeeper.hpp>
#include <process/delay.hpp>
#include <process/dispatch.hpp>
#include <process/id.hpp>
#include <process/process.hpp>
#include <stout/check.hpp>
#include <stout/duration.hpp>
#include <stout/error.hpp>
#include <stout/none.hpp>
#include <stout/numify.hpp>
#include <stout/path.hpp>
#include <stout/result.hpp>
#include <stout/some.hpp>
#include <stout/strings.hpp>
#include <stout/utils.hpp>
#include <stout/os/constants.hpp>
#include "logging/logging.hpp"
using namespace process;
using process::wait; // Necessary on some OS's to disambiguate.
using std::queue;
using std::set;
using std::string;
using std::vector;
namespace zookeeper {
// Time to wait after retryable errors.
const Duration GroupProcess::RETRY_INTERVAL = Seconds(2);
// Helper for failing a queue of promises.
template <typename T>
void fail(queue<T*>* queue, const string& message)
{
while (!queue->empty()) {
T* t = queue->front();
queue->pop();
t->promise.fail(message);
delete t;
}
}
// Helper for discarding a queue of promises.
template <typename T>
void discard(queue<T*>* queue)
{
while (!queue->empty()) {
T* t = queue->front();
queue->pop();
t->promise.discard();
delete t;
}
}
GroupProcess::GroupProcess(
const string& _servers,
const Duration& _sessionTimeout,
const string& _znode,
const Option<Authentication>& _auth)
: ProcessBase(ID::generate("zookeeper-group")),
servers(_servers),
sessionTimeout(_sessionTimeout),
znode(strings::remove(_znode, "/", strings::SUFFIX)),
auth(_auth),
acl(_auth.isSome()
? EVERYONE_READ_CREATOR_ALL
: ZOO_OPEN_ACL_UNSAFE),
watcher(nullptr),
zk(nullptr),
state(DISCONNECTED),
retrying(false)
{}
GroupProcess::GroupProcess(
const URL& url,
const Duration& sessionTimeout)
: GroupProcess(
url.servers,
sessionTimeout,
strings::remove(url.path, "/", strings::SUFFIX),
url.authentication)
{}
// NB: The `retry` and `connect` timers might still be active. However,
// we don't need to clean them up -- when the timers fire, they will
// attempt to dispatch to a no-longer-valid PID, which is a no-op.
GroupProcess::~GroupProcess()
{
discard(&pending.joins);
discard(&pending.cancels);
discard(&pending.datas);
discard(&pending.watches);
delete zk;
delete watcher;
}
void GroupProcess::initialize()
{
// Doing initialization here allows to avoid the race between
// instantiating the ZooKeeper instance and being spawned ourself.
startConnection();
}
void GroupProcess::startConnection()
{
watcher = new ProcessWatcher<GroupProcess>(self());
zk = new ZooKeeper(servers, sessionTimeout, watcher);
state = CONNECTING;
// If the connection is not established within the session timeout,
// close the ZooKeeper handle and create a new one. This is
// important because the ZooKeeper 3.4 client libraries don't try to
// re-resolve the list of hostnames, so we create a new ZooKeeper
// handle to ensure we observe DNS changes. See MESOS-4546 and
// `ZooKeeperProcess::initialize` for more information.
CHECK_NONE(connectTimer);
connectTimer = delay(zk->getSessionTimeout(),
self(),
&Self::timedout,
zk->getSessionId());
}
Future<Group::Membership> GroupProcess::join(
const string& data,
const Option<string>& label)
{
if (error.isSome()) {
return Failure(error.get());
} else if (state != READY) {
Join* join = new Join(data, label);
pending.joins.push(join);
return join->promise.future();
}
// TODO(benh): Write a test to see how ZooKeeper fails setting znode
// data when the data is larger than 1 MB so we know whether or not
// to check for that here.
// TODO(benh): Only attempt if the pending queue is empty so that a
// client can assume a happens-before ordering of operations (i.e.,
// the first request will happen before the second, etc).
Result<Group::Membership> membership = doJoin(data, label);
if (membership.isNone()) { // Try again later.
if (!retrying) {
delay(RETRY_INTERVAL, self(), &GroupProcess::retry, RETRY_INTERVAL);
retrying = true;
}
Join* join = new Join(data, label);
pending.joins.push(join);
return join->promise.future();
} else if (membership.isError()) {
return Failure(membership.error());
}
return membership.get();
}
Future<bool> GroupProcess::cancel(const Group::Membership& membership)
{
if (error.isSome()) {
return Failure(error.get());
} else if (owned.count(membership.id()) == 0) {
// TODO(benh): Should this be an error? Right now a user can't
// differentiate when 'false' means they can't cancel because it's
// not owned or because it's already been cancelled (explicitly by
// them or implicitly due to session expiration or operator
// error).
return false;
}
if (state != READY) {
Cancel* cancel = new Cancel(membership);
pending.cancels.push(cancel);
return cancel->promise.future();
}
// TODO(benh): Only attempt if the pending queue is empty so that a
// client can assume a happens-before ordering of operations (i.e.,
// the first request will happen before the second, etc).
Result<bool> cancellation = doCancel(membership);
if (cancellation.isNone()) { // Try again later.
if (!retrying) {
delay(RETRY_INTERVAL, self(), &GroupProcess::retry, RETRY_INTERVAL);
retrying = true;
}
Cancel* cancel = new Cancel(membership);
pending.cancels.push(cancel);
return cancel->promise.future();
} else if (cancellation.isError()) {
return Failure(cancellation.error());
}
return cancellation.get();
}
Future<Option<string>> GroupProcess::data(const Group::Membership& membership)
{
if (error.isSome()) {
return Failure(error.get());
} else if (state != READY) {
Data* data = new Data(membership);
pending.datas.push(data);
return data->promise.future();
}
// TODO(benh): Only attempt if the pending queue is empty so that a
// client can assume a happens-before ordering of operations (i.e.,
// the first request will happen before the second, etc).
Result<Option<string>> result = doData(membership);
if (result.isNone()) { // Try again later.
Data* data = new Data(membership);
pending.datas.push(data);
return data->promise.future();
} else if (result.isError()) {
return Failure(result.error());
}
return result.get();
}
Future<set<Group::Membership>> GroupProcess::watch(
const set<Group::Membership>& expected)
{
if (error.isSome()) {
return Failure(error.get());
} else if (state != READY) {
Watch* watch = new Watch(expected);
pending.watches.push(watch);
return watch->promise.future();
}
// To guarantee causality, we must invalidate our cache of
// memberships after any updates are made to the group (i.e., joins
// and cancels). This is because a client that just learned of a
// successful join shouldn't invoke watch and get a set of
// memberships without their membership present (which is possible
// if we return a cache of memberships that hasn't yet been updated
// via a ZooKeeper event) unless that membership has since expired
// (or been deleted, e.g., via operator error). Thus, we do a
// membership "roll call" for each watch in order to make sure all
// causal relationships are satisfied.
if (memberships.isNone()) {
Try<bool> cached = cache();
if (cached.isError()) {
// Non-retryable error.
return Failure(cached.error());
} else if (!cached.get()) {
CHECK_NONE(memberships);
// Try again later.
if (!retrying) {
delay(RETRY_INTERVAL, self(), &GroupProcess::retry, RETRY_INTERVAL);
retrying = true;
}
Watch* watch = new Watch(expected);
pending.watches.push(watch);
return watch->promise.future();
}
}
CHECK_SOME(memberships);
if (memberships.get() == expected) { // Just wait for updates.
Watch* watch = new Watch(expected);
pending.watches.push(watch);
return watch->promise.future();
}
return memberships.get();
}
Future<Option<int64_t>> GroupProcess::session()
{
if (error.isSome()) {
return Failure(error.get());
} else if (state == CONNECTING) {
return None();
}
return Some(zk->getSessionId());
}
void GroupProcess::connected(int64_t sessionId, bool reconnect)
{
if (error.isSome() || sessionId != zk->getSessionId()) {
return;
}
LOG(INFO) << "Group process (" << self() << ") "
<< (reconnect ? "reconnected" : "connected") << " to ZooKeeper";
if (!reconnect) {
// This is the first time the ZooKeeper client connects to
// ZooKeeper service. (It could be also the first time for the
// group or after session expiration which causes a new ZooKeeper
// client instance to be created.)
CHECK_EQ(state, CONNECTING);
state = CONNECTED;
} else {
// This means we are reconnecting within the same ZooKeeper
// session. We could have completed authenticate() or create()
// before we lost the connection (thus the state can be the any
// of the following three) so 'sync()' below will check the state
// and only execute necessary operations accordingly.
CHECK(state == CONNECTED || state == AUTHENTICATED || state == READY)
<< state;
}
// Cancel and cleanup the connect timer. The timer should always be
// set, because it is set before making the initial connection
// attempt and whenever a reconnection attempt is made.
CHECK_SOME(connectTimer);
// Now that we are connected, we'll learn about a subsequent
// disconnection event via the `reconnecting` callback. At that
// point we'll also restart the `connectTimer` to ensure we retry
// the reconnection attempt.
Clock::cancel(connectTimer.get());
connectTimer = None();
// Sync group operations (and set up the group on ZK).
Try<bool> synced = sync();
if (synced.isError()) {
// Non-retryable error. Abort.
abort(synced.error());
} else if (!synced.get()) {
// Retryable error.
if (!retrying) {
delay(RETRY_INTERVAL, self(), &GroupProcess::retry, RETRY_INTERVAL);
retrying = true;
}
}
}
Try<bool> GroupProcess::authenticate()
{
CHECK_EQ(state, CONNECTED);
// Authenticate if necessary.
if (auth.isSome()) {
LOG(INFO) << "Authenticating with ZooKeeper using " << auth->scheme;
int code = zk->authenticate(auth->scheme, auth->credentials);
if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
return false;
} else if (code != ZOK) {
return Error(
"Failed to authenticate with ZooKeeper: " + zk->message(code));
}
}
state = AUTHENTICATED;
return true;
}
Try<bool> GroupProcess::create()
{
CHECK_EQ(state, AUTHENTICATED);
// Create znode path (including intermediate znodes) as necessary.
CHECK(znode.size() == 0 || znode.at(znode.size() - 1) != '/');
LOG(INFO) << "Trying to create path '" << znode << "' in ZooKeeper";
int code = zk->create(znode, "", acl, 0, nullptr, true);
// We fail all non-retryable return codes except ZNONODEEXISTS (
// since that means the path we were trying to create exists). Note
// that it's also possible we got back a ZNONODE because we could
// not create one of the intermediate znodes (in which case we'll
// abort in the 'else if' below since ZNONODE is non-retryable).
// Also note that it's possible that the intermediate path exists
// but we don't have permission to know it, in this case we abort
// as well to be on the safe side
// TODO(benh): Need to check that we also can put a watch on the
// children of 'znode'.
if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
CHECK_NE(zk->getState(), ZOO_AUTH_FAILED_STATE);
return false;
} else if (code != ZOK && code != ZNODEEXISTS) {
return Error(
"Failed to create '" + znode + "' in ZooKeeper: " + zk->message(code));
}
state = READY;
return true;
}
void GroupProcess::reconnecting(int64_t sessionId)
{
if (error.isSome() || sessionId != zk->getSessionId()) {
return;
}
LOG(INFO) << "Lost connection to ZooKeeper, attempting to reconnect ...";
// Set 'retrying' to false to prevent retry() from executing sync()
// before the group reconnects to ZooKeeper. The group will sync
// with ZooKeeper after it is connected.
retrying = false;
// ZooKeeper won't tell us of a session expiration until we
// reconnect, which could occur much much later than the session was
// actually expired. This can lead to a prolonged split-brain
// scenario when network partitions occur. Rather than wait for a
// reconnection to occur (i.e., a network partition to be repaired)
// we create a local timer and "expire" our session prematurely if
// we haven't reconnected within the session expiration time out.
// The timer can be reset if the connection is restored.
// We expect to see exactly one `reconnecting` event when our
// session is disconnected, even if we're disconnected for an
// extended period. Since we clear the `connectTimer` when the
// connection is established, it should still be unset here.
CHECK_NONE(connectTimer);
// Use the negotiated session timeout for the connect timer.
connectTimer = delay(zk->getSessionTimeout(),
self(),
&Self::timedout,
zk->getSessionId());
}
void GroupProcess::timedout(int64_t sessionId)
{
if (error.isSome()) {
return;
}
CHECK_NOTNULL(zk);
// The connect timer can be reset or replaced and `zk`
// can be replaced since this method was dispatched.
if (connectTimer.isSome() &&
connectTimer->timeout().expired() &&
zk->getSessionId() == sessionId) {
LOG(WARNING) << "Timed out waiting to connect to ZooKeeper. "
<< "Forcing ZooKeeper session "
<< "(sessionId=" << std::hex << sessionId << ") expiration";
// Locally determine that the current session has expired.
dispatch(self(), &Self::expired, zk->getSessionId());
}
}
void GroupProcess::expired(int64_t sessionId)
{
if (error.isSome() || sessionId != zk->getSessionId()) {
return;
}
LOG(INFO) << "ZooKeeper session expired";
// Cancel the retries. Group will sync() after it reconnects to ZK.
retrying = false;
// Cancel and cleanup the connect timer (if necessary).
if (connectTimer.isSome()) {
Clock::cancel(connectTimer.get());
connectTimer = None();
}
// From the group's local perspective all the memberships are
// gone so we need to update the watches.
// If the memberships still exist on ZooKeeper, they will be
// restored in group after the group reconnects to ZK.
// This is a precaution against the possibility that ZK connection
// is lost right after we recreate the ZK instance below or the
// entire ZK cluster goes down. The outage can last for a long time
// but the clients watching the group should be informed sooner.
memberships = set<Group::Membership>();
update();
// Invalidate the cache so that we'll sync with ZK after
// reconnection.
memberships = None();
// Set all owned memberships as cancelled.
foreachpair (int32_t sequence, Promise<bool>* cancelled, utils::copy(owned)) {
cancelled->set(false); // Since this was not requested.
owned.erase(sequence); // Okay since iterating over a copy.
delete cancelled;
}
CHECK(owned.empty());
// Note that we DO NOT clear unowned. The next time we try and cache
// the memberships we'll trigger any cancelled unowned memberships
// then. We could imagine doing this for owned memberships too, but
// for now we proactively cancel them above.
state = DISCONNECTED;
delete CHECK_NOTNULL(zk);
delete CHECK_NOTNULL(watcher);
startConnection();
}
void GroupProcess::updated(int64_t sessionId, const string& path)
{
if (error.isSome() || sessionId != zk->getSessionId()) {
return;
}
CHECK_EQ(znode, path);
Try<bool> cached = cache(); // Update cache (will invalidate first).
if (cached.isError()) {
abort(cached.error()); // Cancel everything pending.
} else if (!cached.get()) {
CHECK_NONE(memberships);
// Try again later.
if (!retrying) {
delay(RETRY_INTERVAL, self(), &GroupProcess::retry, RETRY_INTERVAL);
retrying = true;
}
} else {
update(); // Update any pending watches.
}
}
void GroupProcess::created(int64_t sessionId, const string& path)
{
LOG(FATAL) << "Unexpected ZooKeeper event";
}
void GroupProcess::deleted(int64_t sessionId, const string& path)
{
LOG(FATAL) << "Unexpected ZooKeeper event";
}
Result<Group::Membership> GroupProcess::doJoin(
const string& data,
const Option<string>& label)
{
CHECK_EQ(state, READY);
const string path = znode + "/" + (label.isSome() ? (label.get() + "_") : "");
// Create a new ephemeral node to represent a new member and use the
// the specified data as its contents.
string result;
const int code = zk->create(
path,
data,
acl,
ZOO_SEQUENCE | ZOO_EPHEMERAL,
&result);
if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
CHECK_NE(zk->getState(), ZOO_AUTH_FAILED_STATE);
return None();
} else if (code != ZOK) {
return Error(
"Failed to create ephemeral node at '" + path +
"' in ZooKeeper: " + zk->message(code));
}
// Invalidate the cache (it will/should get immediately populated
// via the 'updated' callback of our ZooKeeper watcher).
memberships = None();
// Save the sequence number but only grab the basename. Example:
// "/path/to/znode/label_0000000131" => "0000000131".
const string basename = strings::tokenize(result, "/").back();
// Strip the label before grabbing the sequence number.
const string node = label.isSome()
? strings::remove(basename, label.get() + "_")
: basename;
Try<int32_t> sequence = numify<int32_t>(node);
CHECK_SOME(sequence);
Promise<bool>* cancelled = new Promise<bool>();
owned[sequence.get()] = cancelled;
return Group::Membership(sequence.get(), label, cancelled->future());
}
Result<bool> GroupProcess::doCancel(const Group::Membership& membership)
{
CHECK_EQ(state, READY);
string path = path::join(
znode,
zkBasename(membership),
os::POSIX_PATH_SEPARATOR);
LOG(INFO) << "Trying to remove '" << path << "' in ZooKeeper";
// Remove ephemeral node.
int code = zk->remove(path, -1);
if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
CHECK_NE(zk->getState(), ZOO_AUTH_FAILED_STATE);
return None();
} else if (code == ZNONODE) {
// This can happen because the membership could have expired but
// we have yet to receive the update about it.
return false;
} else if (code != ZOK) {
return Error(
"Failed to remove ephemeral node '" + path +
"' in ZooKeeper: " + zk->message(code));
}
// Invalidate the cache (it will/should get immediately populated
// via the 'updated' callback of our ZooKeeper watcher).
memberships = None();
// Let anyone waiting know the membership has been cancelled.
CHECK(owned.count(membership.id()) == 1);
Promise<bool>* cancelled = owned[membership.id()];
cancelled->set(true);
owned.erase(membership.id());
delete cancelled;
return true;
}
Result<Option<string>> GroupProcess::doData(
const Group::Membership& membership)
{
CHECK_EQ(state, READY);
string path = path::join(
znode,
zkBasename(membership),
os::POSIX_PATH_SEPARATOR);
LOG(INFO) << "Trying to get '" << path << "' in ZooKeeper";
// Get data associated with ephemeral node.
string result;
int code = zk->get(path, false, &result, nullptr);
if (code == ZNONODE) {
return Option<string>::none();
} else if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
CHECK_NE(zk->getState(), ZOO_AUTH_FAILED_STATE);
return None(); // Try again later.
} else if (code != ZOK) {
return Error(
"Failed to get data for ephemeral node '" + path +
"' in ZooKeeper: " + zk->message(code));
}
return Some(result);
}
Try<bool> GroupProcess::cache()
{
// Invalidate first (if it's not already).
memberships = None();
// Get all children to determine current memberships.
vector<string> results;
int code = zk->getChildren(znode, true, &results); // Sets the watch!
if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
CHECK_NE(zk->getState(), ZOO_AUTH_FAILED_STATE);
return false;
} else if (code != ZOK) {
return Error("Non-retryable error attempting to get children of '" + znode
+ "' in ZooKeeper: " + zk->message(code));
}
// Convert results to sequence numbers and (optionally) labels.
hashmap<int32_t, Option<string>> sequences;
foreach (const string& result, results) {
vector<string> tokens = strings::tokenize(result, "_");
Option<string> label = None();
if (tokens.size() > 1) {
label = tokens[0];
}
Try<int32_t> sequence = numify<int32_t>(tokens.back());
// Skip it if it couldn't be converted to a number.
// NOTE: This is currently possible when using a replicated log
// based registry because the log replicas register under
// "/log_replicas" at the same path as the masters' ephemeral
// znodes.
if (sequence.isError()) {
VLOG(1) << "Found non-sequence node '" << result
<< "' at '" << znode << "' in ZooKeeper";
continue;
}
sequences[sequence.get()] = label;
}
// Cache current memberships, cancelling those that are now missing.
set<Group::Membership> current;
foreachpair (int32_t sequence, Promise<bool>* cancelled, utils::copy(owned)) {
if (!sequences.contains(sequence)) {
cancelled->set(false);
owned.erase(sequence); // Okay since iterating over a copy.
delete cancelled;
} else {
current.insert(Group::Membership(
sequence, sequences[sequence], cancelled->future()));
sequences.erase(sequence);
}
}
foreachpair (int32_t sequence,
Promise<bool>* cancelled,
utils::copy(unowned)) {
if (!sequences.contains(sequence)) {
cancelled->set(false);
unowned.erase(sequence); // Okay since iterating over a copy.
delete cancelled;
} else {
current.insert(Group::Membership(
sequence, sequences[sequence], cancelled->future()));
sequences.erase(sequence);
}
}
// Add any remaining (i.e., unexpected) sequences.
foreachpair (int32_t sequence, const Option<string>& label, sequences) {
Promise<bool>* cancelled = new Promise<bool>();
unowned[sequence] = cancelled;
current.insert(Group::Membership(sequence, label, cancelled->future()));
}
memberships = current;
return true;
}
void GroupProcess::update()
{
CHECK_SOME(memberships);
const size_t size = pending.watches.size();
for (size_t i = 0; i < size; i++) {
Watch* watch = pending.watches.front();
if (memberships.get() != watch->expected) {
watch->promise.set(memberships.get());
pending.watches.pop();
delete watch;
} else {
// Don't delete the watch, but push it to the back of the queue.
pending.watches.push(watch);
pending.watches.pop();
}
}
}
Try<bool> GroupProcess::sync()
{
LOG(INFO)
<< "Syncing group operations: queue size (joins, cancels, datas) = ("
<< pending.joins.size() << ", " << pending.cancels.size() << ", "
<< pending.datas.size() << ")";
// The state may be CONNECTED or AUTHENTICATED if Group setup has
// not finished.
CHECK(state == CONNECTED || state == AUTHENTICATED || state == READY)
<< state;
// Authenticate with ZK if not already authenticated.
if (state == CONNECTED) {
Try<bool> authenticated = authenticate();
if (authenticated.isError() || !authenticated.get()) {
return authenticated;
}
}
// Create group base path if not already created.
if (state == AUTHENTICATED) {
Try<bool> created = create();
if (created.isError() || !created.get()) {
return created;
}
}
// Do joins.
while (!pending.joins.empty()) {
Join* join = pending.joins.front();
Result<Group::Membership> membership = doJoin(join->data, join->label);
if (membership.isNone()) {
return false; // Try again later.
} else if (membership.isError()) {
join->promise.fail(membership.error());
} else {
join->promise.set(membership.get());
}
pending.joins.pop();
delete join;
}
// Do cancels.
while (!pending.cancels.empty()) {
Cancel* cancel = pending.cancels.front();
Result<bool> cancellation = doCancel(cancel->membership);
if (cancellation.isNone()) {
return false; // Try again later.
} else if (cancellation.isError()) {
cancel->promise.fail(cancellation.error());
} else {
cancel->promise.set(cancellation.get());
}
pending.cancels.pop();
delete cancel;
}
// Do datas.
while (!pending.datas.empty()) {
Data* data = pending.datas.front();
// TODO(benh): Ignore if future has been discarded?
Result<Option<string>> result = doData(data->membership);
if (result.isNone()) {
return false; // Try again later.
} else if (result.isError()) {
data->promise.fail(result.error());
} else {
data->promise.set(result.get());
}
pending.datas.pop();
delete data;
}
// Get cache of memberships if we don't have one. Note that we do
// this last because any joins or cancels above will invalidate our
// cache, so it would be nice to get it validated again at the
// end. The side-effect here is that users will learn of joins and
// cancels first through any explicit futures for them rather than
// watches.
if (memberships.isNone()) {
Try<bool> cached = cache();
if (cached.isError() || !cached.get()) {
CHECK_NONE(memberships);
return cached;
} else {
update(); // Update any pending watches.
}
}
return true;
}
void GroupProcess::retry(const Duration& duration)
{
if (!retrying) {
// Retry could be cancelled before it is scheduled.
return;
}
// We cancel the retries when the group aborts and when its ZK
// session expires so 'retrying' should be false in the condition
// check above.
CHECK_NONE(error);
// In order to be retrying, we should be at least CONNECTED.
CHECK(state == CONNECTED || state == AUTHENTICATED || state == READY)
<< state;
// Will reset it to true if another retry is necessary.
retrying = false;
Try<bool> synced = sync();
if (synced.isError()) {
// Non-retryable error. Abort.
abort(synced.error());
} else if (!synced.get()) {
// Backoff and keep retrying.
retrying = true;
Seconds seconds = std::min(duration * 2, Duration(Seconds(60)));
delay(seconds, self(), &GroupProcess::retry, seconds);
}
}
void GroupProcess::abort(const string& message)
{
// Set the error variable so that the group becomes unfunctional.
error = Error(message);
LOG(ERROR) << "Group aborting: " << message;
// Cancel the retries.
retrying = false;
fail(&pending.joins, message);
fail(&pending.cancels, message);
fail(&pending.datas, message);
fail(&pending.watches, message);
// Set all owned memberships as cancelled.
foreachvalue (Promise<bool>* cancelled, owned) {
cancelled->set(false); // Since this was not requested.
delete cancelled;
}
owned.clear();
// Since we decided to abort, we expire the session to clean up
// ephemeral ZNodes as necessary.
delete CHECK_NOTNULL(zk);
delete CHECK_NOTNULL(watcher);
zk = nullptr;
watcher = nullptr;
}
string GroupProcess::zkBasename(const Group::Membership& membership)
{
Try<string> sequence = strings::format("%.*d", 10, membership.sequence);
CHECK_SOME(sequence);
return membership.label_.isSome()
? (membership.label_.get() + "_" + sequence.get())
: sequence.get();
}
Group::Group(const string& servers,
const Duration& sessionTimeout,
const string& znode,
const Option<Authentication>& auth)
{
process = new GroupProcess(servers, sessionTimeout, znode, auth);
spawn(process);
}
Group::Group(const URL& url,
const Duration& sessionTimeout)
{
process = new GroupProcess(url, sessionTimeout);
spawn(process);
}
Group::~Group()
{
terminate(process);
wait(process);
delete process;
}
Future<Group::Membership> Group::join(
const string& data,
const Option<string>& label)
{
return dispatch(process, &GroupProcess::join, data, label);
}
Future<bool> Group::cancel(const Group::Membership& membership)
{
return dispatch(process, &GroupProcess::cancel, membership);
}
Future<Option<string>> Group::data(const Group::Membership& membership)
{
return dispatch(process, &GroupProcess::data, membership);
}
Future<set<Group::Membership>> Group::watch(
const set<Group::Membership>& expected)
{
return dispatch(process, &GroupProcess::watch, expected);
}
Future<Option<int64_t>> Group::session()
{
return dispatch(process, &GroupProcess::session);
}
} // namespace zookeeper {