blob: 83505830fcb23f65db081bac774fddfd15d10c5c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License
#ifndef __MESOS_ZOOKEEPER_GROUP_HPP__
#define __MESOS_ZOOKEEPER_GROUP_HPP__
#include <map>
#include <set>
#include <string>
#include <mesos/zookeeper/authentication.hpp>
#include <mesos/zookeeper/url.hpp>
#include <process/future.hpp>
#include <process/process.hpp>
#include <process/timer.hpp>
#include <stout/check.hpp>
#include <stout/duration.hpp>
#include <stout/none.hpp>
#include <stout/option.hpp>
#include <stout/try.hpp>
// Forward declarations.
class Watcher;
class ZooKeeper;
namespace zookeeper {
// Forward declaration.
class GroupProcess;
// Represents a distributed group managed by ZooKeeper. A group is
// associated with a specific ZooKeeper path, and members are
// represented by ephemeral sequential nodes.
class Group
{
public:
// Represents a group membership. Note that we order memberships by
// membership id (that is, an older membership is ordered before a
// younger membership). In addition, we do not use the "cancelled"
// future to compare memberships so that two memberships created
// from different Group instances will still be considered the same.
struct Membership
{
bool operator==(const Membership& that) const
{
return sequence == that.sequence;
}
bool operator!=(const Membership& that) const
{
return sequence != that.sequence;
}
bool operator<(const Membership& that) const
{
return sequence < that.sequence;
}
bool operator<=(const Membership& that) const
{
return sequence <= that.sequence;
}
bool operator>(const Membership& that) const
{
return sequence > that.sequence;
}
bool operator>=(const Membership& that) const
{
return sequence >= that.sequence;
}
int32_t id() const
{
return sequence;
}
Option<std::string> label() const
{
return label_;
}
// Returns a future that is only satisfied once this membership
// has been cancelled. In which case, the value of the future is
// true if you own this membership and cancelled it by invoking
// Group::cancel. Otherwise, the value of the future is false (and
// could signify cancellation due to a session expiration or
// operator error).
process::Future<bool> cancelled() const
{
return cancelled_;
}
private:
friend class GroupProcess; // Creates and manages memberships.
Membership(int32_t _sequence,
const Option<std::string>& _label,
const process::Future<bool>& cancelled)
: sequence(_sequence), label_(_label), cancelled_(cancelled) {}
const int32_t sequence;
const Option<std::string> label_;
process::Future<bool> cancelled_;
};
// Constructs this group using the specified ZooKeeper servers (list
// of host:port) with the given session timeout at the specified znode.
Group(const std::string& servers,
const Duration& sessionTimeout,
const std::string& znode,
const Option<Authentication>& auth = None());
Group(const URL& url,
const Duration& sessionTimeout);
~Group();
// Returns the result of trying to join a "group" in ZooKeeper.
// If "label" is provided the newly created znode contains "label_"
// as the prefix. If join is successful, an "owned" membership will
// be returned whose retrievable data will be a copy of the
// specified parameter. A membership is not "renewed" in the event
// of a ZooKeeper session expiration. Instead, a client should watch
// the group memberships and rejoin the group as appropriate.
process::Future<Membership> join(
const std::string& data,
const Option<std::string>& label = None());
// Returns the result of trying to cancel a membership. Note that
// only memberships that are "owned" (see join) can be canceled.
process::Future<bool> cancel(const Membership& membership);
// Returns the result of trying to fetch the data associated with a
// group membership.
// A None is returned if the specified membership doesn't exist,
// e.g., it can be removed before this call can read it content.
process::Future<Option<std::string>> data(const Membership& membership);
// Returns a future that gets set when the group memberships differ
// from the "expected" memberships specified.
process::Future<std::set<Membership>> watch(
const std::set<Membership>& expected = std::set<Membership>());
// Returns the current ZooKeeper session associated with this group,
// or none if no session currently exists.
process::Future<Option<int64_t>> session();
// Made public for testing purposes.
GroupProcess* process;
};
class GroupProcess : public process::Process<GroupProcess>
{
public:
GroupProcess(const std::string& servers,
const Duration& sessionTimeout,
const std::string& znode,
const Option<Authentication>& auth);
GroupProcess(const URL& url,
const Duration& sessionTimeout);
~GroupProcess() override;
void initialize() override;
static const Duration RETRY_INTERVAL;
// Helper function that returns the basename of the znode of
// the membership.
static std::string zkBasename(const Group::Membership& membership);
// Group implementation.
process::Future<Group::Membership> join(
const std::string& data,
const Option<std::string>& label);
process::Future<bool> cancel(const Group::Membership& membership);
process::Future<Option<std::string>> data(
const Group::Membership& membership);
process::Future<std::set<Group::Membership>> watch(
const std::set<Group::Membership>& expected);
process::Future<Option<int64_t>> session();
// ZooKeeper events.
// Note that events from previous sessions are dropped.
void connected(int64_t sessionId, bool reconnect);
void reconnecting(int64_t sessionId);
void expired(int64_t sessionId);
void updated(int64_t sessionId, const std::string& path);
void created(int64_t sessionId, const std::string& path);
void deleted(int64_t sessionId, const std::string& path);
private:
void startConnection();
Result<Group::Membership> doJoin(
const std::string& data,
const Option<std::string>& label);
Result<bool> doCancel(const Group::Membership& membership);
Result<Option<std::string>> doData(const Group::Membership& membership);
// Returns true if authentication is successful, false if the
// failure is retryable and Error otherwise.
Try<bool> authenticate();
// Creates the group (which means creating its base path) on ZK.
// Returns true if successful, false if the failure is retryable
// and Error otherwise.
Try<bool> create();
// Attempts to cache the current set of memberships.
// Returns true if successful, false if the failure is retryable
// and Error otherwise.
Try<bool> cache();
// Synchronizes pending operations with ZooKeeper and also attempts
// to cache the current set of memberships if necessary.
// Returns true if successful, false if the failure is retryable
// and Error otherwise.
Try<bool> sync();
// Updates any pending watches.
void update();
// Generic retry method. This mechanism is "generic" in the sense
// that it is not specific to any particular operation, but rather
// attempts to perform all pending operations (including caching
// memberships if necessary).
void retry(const Duration& duration);
void timedout(int64_t sessionId);
// Aborts the group instance and fails all pending operations.
// The group then enters an error state and all subsequent
// operations will fail as well.
void abort(const std::string& message);
// Potential non-retryable error set by abort().
Option<Error> error;
const std::string servers;
// The session timeout requested by the client.
const Duration sessionTimeout;
const std::string znode;
Option<Authentication> auth; // ZooKeeper authentication.
const ACL_vector acl; // Default ACL to use.
Watcher* watcher;
ZooKeeper* zk;
// Group connection state.
// Normal state transitions:
// DISCONNECTED -> CONNECTING -> CONNECTED -> AUTHENTICATED
// -> READY.
// Reconnection does not change the current state and the state is
// only reset to DISCONNECTED after session expiration. Therefore
// the client's "progress" in setting up the group is preserved
// across reconnections. This means authenticate() and create() are
// only successfully executed once in one ZooKeeper session.
enum State
{
DISCONNECTED, // The initial state.
CONNECTING, // ZooKeeper connecting.
CONNECTED, // ZooKeeper connected but before group setup.
AUTHENTICATED, // ZooKeeper connected and authenticated.
READY, // ZooKeeper connected, session authenticated and
// base path for the group created.
} state;
struct Join
{
Join(const std::string& _data, const Option<std::string>& _label)
: data(_data), label(_label) {}
std::string data;
const Option<std::string> label;
process::Promise<Group::Membership> promise;
};
struct Cancel
{
explicit Cancel(const Group::Membership& _membership)
: membership(_membership) {}
Group::Membership membership;
process::Promise<bool> promise;
};
struct Data
{
explicit Data(const Group::Membership& _membership)
: membership(_membership) {}
Group::Membership membership;
process::Promise<Option<std::string>> promise;
};
struct Watch
{
explicit Watch(const std::set<Group::Membership>& _expected)
: expected(_expected) {}
std::set<Group::Membership> expected;
process::Promise<std::set<Group::Membership>> promise;
};
struct {
std::queue<Join*> joins;
std::queue<Cancel*> cancels;
std::queue<Data*> datas;
std::queue<Watch*> watches;
} pending;
// Indicates there is a pending delayed retry.
bool retrying;
// Expected ZooKeeper sequence numbers (either owned/created by this
// group instance or not) and the promise we associate with their
// "cancellation" (i.e., no longer part of the group).
std::map<int32_t, process::Promise<bool>*> owned;
std::map<int32_t, process::Promise<bool>*> unowned;
// Cache of owned + unowned, where 'None' represents an invalid
// cache and 'Some' represents a valid cache.
Option<std::set<Group::Membership>> memberships;
// A timer that controls when we should give up on waiting for the
// current connection attempt to succeed and try to reconnect.
Option<process::Timer> connectTimer;
};
} // namespace zookeeper {
#endif // __MESOS_ZOOKEEPER_GROUP_HPP__