blob: 738210f4eda4a5ca2ef86409520615ad1812cb12 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License
#include <map>
#include <set>
#include <string>
#include "process/future.hpp"
#include "process/timer.hpp"
#include "process/process.hpp"
#include <stout/check.hpp>
#include <stout/duration.hpp>
#include <stout/none.hpp>
#include <stout/option.hpp>
#include <stout/try.hpp>
#include "zookeeper/authentication.hpp"
#include "zookeeper/url.hpp"
// Forward declarations.
class Watcher;
class ZooKeeper;
namespace zookeeper {
// Forward declaration.
class GroupProcess;
// Represents a distributed group managed by ZooKeeper. A group is
// associated with a specific ZooKeeper path, and members are
// represented by ephemeral sequential nodes.
class Group
// Represents a group membership. Note that we order memberships by
// membership id (that is, an older membership is ordered before a
// younger membership). In addition, we do not use the "cancelled"
// future to compare memberships so that two memberships created
// from different Group instances will still be considered the same.
struct Membership
bool operator==(const Membership& that) const
return sequence == that.sequence;
bool operator!=(const Membership& that) const
return sequence != that.sequence;
bool operator<(const Membership& that) const
return sequence < that.sequence;
bool operator<=(const Membership& that) const
return sequence <= that.sequence;
bool operator>(const Membership& that) const
return sequence > that.sequence;
bool operator>=(const Membership& that) const
return sequence >= that.sequence;
int32_t id() const
return sequence;
Option<std::string> label() const
return label_;
// Returns a future that is only satisfied once this membership
// has been cancelled. In which case, the value of the future is
// true if you own this membership and cancelled it by invoking
// Group::cancel. Otherwise, the value of the future is false (and
// could signify cancellation due to a sesssion expiration or
// operator error).
process::Future<bool> cancelled() const
return cancelled_;
friend class GroupProcess; // Creates and manages memberships.
Membership(int32_t _sequence,
const Option<std::string>& _label,
const process::Future<bool>& cancelled)
: sequence(_sequence), label_(_label), cancelled_(cancelled) {}
const int32_t sequence;
const Option<std::string> label_;
process::Future<bool> cancelled_;
// Constructs this group using the specified ZooKeeper servers (list
// of host:port) with the given timeout at the specified znode.
Group(const std::string& servers,
const Duration& timeout,
const std::string& znode,
const Option<Authentication>& auth = None());
Group(const URL& url,
const Duration& timeout);
// Returns the result of trying to join a "group" in ZooKeeper.
// If "label" is provided the newly created znode contains "label_"
// as the prefix. If join is successful, an "owned" membership will
// be returned whose retrievable data will be a copy of the
// specified parameter. A membership is not "renewed" in the event
// of a ZooKeeper session expiration. Instead, a client should watch
// the group memberships and rejoin the group as appropriate.
process::Future<Membership> join(
const std::string& data,
const Option<std::string>& label = None());
// Returns the result of trying to cancel a membership. Note that
// only memberships that are "owned" (see join) can be canceled.
process::Future<bool> cancel(const Membership& membership);
// Returns the result of trying to fetch the data associated with a
// group membership.
// A None is returned if the specified membership doesn't exist,
// e.g., it can be removed before this call can read it content.
process::Future<Option<std::string> > data(const Membership& membership);
// Returns a future that gets set when the group memberships differ
// from the "expected" memberships specified.
process::Future<std::set<Membership> > watch(
const std::set<Membership>& expected = std::set<Membership>());
// Returns the current ZooKeeper session associated with this group,
// or none if no session currently exists.
process::Future<Option<int64_t> > session();
// Made public for testing purposes.
GroupProcess* process;
class GroupProcess : public process::Process<GroupProcess>
GroupProcess(const std::string& servers,
const Duration& timeout,
const std::string& znode,
const Option<Authentication>& auth);
GroupProcess(const URL& url,
const Duration& timeout);
virtual ~GroupProcess();
virtual void initialize();
static const Duration RETRY_INTERVAL;
// Helper function that returns the basename of the znode of
// the membership.
static std::string zkBasename(const Group::Membership& membership);
// Group implementation.
process::Future<Group::Membership> join(
const std::string& data,
const Option<std::string>& label);
process::Future<bool> cancel(const Group::Membership& membership);
process::Future<Option<std::string> > data(
const Group::Membership& membership);
process::Future<std::set<Group::Membership> > watch(
const std::set<Group::Membership>& expected);
process::Future<Option<int64_t> > session();
// ZooKeeper events.
// Note that events from previous sessions are dropped.
void connected(int64_t sessionId, bool reconnect);
void reconnecting(int64_t sessionId);
void expired(int64_t sessionId);
void updated(int64_t sessionId, const std::string& path);
void created(int64_t sessionId, const std::string& path);
void deleted(int64_t sessionId, const std::string& path);
Result<Group::Membership> doJoin(
const std::string& data,
const Option<std::string>& label);
Result<bool> doCancel(const Group::Membership& membership);
Result<Option<std::string> > doData(const Group::Membership& membership);
// Returns true if authentication is successful, false if the
// failure is retryable and Error otherwise.
Try<bool> authenticate();
// Creates the group (which means creating its base path) on ZK.
// Returns true if successful, false if the failure is retryable
// and Error otherwise.
Try<bool> create();
// Attempts to cache the current set of memberships.
// Returns true if successful, false if the failure is retryable
// and Error otherwise.
Try<bool> cache();
// Synchronizes pending operations with ZooKeeper and also attempts
// to cache the current set of memberships if necessary.
// Returns true if successful, false if the failure is retryable
// and Error otherwise.
Try<bool> sync();
// Updates any pending watches.
void update();
// Generic retry method. This mechanism is "generic" in the sense
// that it is not specific to any particular operation, but rather
// attempts to perform all pending operations (including caching
// memberships if necessary).
void retry(const Duration& duration);
void timedout(int64_t sessionId);
// Aborts the group instance and fails all pending operations.
// The group then enters an error state and all subsequent
// operations will fail as well.
void abort(const std::string& message);
// Potential non-retryable error set by abort().
Option<Error> error;
const std::string servers;
// The session timeout requested by the client.
const Duration timeout;
const std::string znode;
Option<Authentication> auth; // ZooKeeper authentication.
const ACL_vector acl; // Default ACL to use.
Watcher* watcher;
ZooKeeper* zk;
// Group connection state.
// Normal state transitions:
// -> READY.
// Reconnection does not change the current state and the state is
// only reset to DISCONNECTED after session expiration. Therefore
// the client's "progress" in setting up the group is preserved
// across reconnections. This means authenticate() and create() are
// only successfully executed once in one ZooKeeper session.
enum State
DISCONNECTED, // The initial state.
CONNECTING, // ZooKeeper connecting.
CONNECTED, // ZooKeeper connected but before group setup.
AUTHENTICATED, // ZooKeeper connected and authenticated.
READY, // ZooKeeper connected, session authenticated and
// base path for the group created.
} state;
struct Join
Join(const std::string& _data, const Option<std::string>& _label)
: data(_data), label(_label) {}
std::string data;
const Option<std::string> label;
process::Promise<Group::Membership> promise;
struct Cancel
explicit Cancel(const Group::Membership& _membership)
: membership(_membership) {}
Group::Membership membership;
process::Promise<bool> promise;
struct Data
explicit Data(const Group::Membership& _membership)
: membership(_membership) {}
Group::Membership membership;
process::Promise<Option<std::string> > promise;
struct Watch
explicit Watch(const std::set<Group::Membership>& _expected)
: expected(_expected) {}
std::set<Group::Membership> expected;
process::Promise<std::set<Group::Membership> > promise;
struct {
std::queue<Join*> joins;
std::queue<Cancel*> cancels;
std::queue<Data*> datas;
std::queue<Watch*> watches;
} pending;
// Indicates there is a pending delayed retry.
bool retrying;
// Expected ZooKeeper sequence numbers (either owned/created by this
// group instance or not) and the promise we associate with their
// "cancellation" (i.e., no longer part of the group).
std::map<int32_t, process::Promise<bool>*> owned;
std::map<int32_t, process::Promise<bool>*> unowned;
// Cache of owned + unowned, where 'None' represents an invalid
// cache and 'Some' represents a valid cache.
Option<std::set<Group::Membership> > memberships;
// The timer that determines whether we should quit waiting for the
// connection to be restored.
Option<process::Timer> timer;
} // namespace zookeeper {