blob: 267c38a2922f114519ffaf4f0bdce74d22fc1506 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <glog/logging.h>
#include <iostream>
#include <map>
#include <boost/tuple/tuple.hpp>
#include <process/dispatch.hpp>
#include <process/process.hpp>
#include <stout/duration.hpp>
#include <stout/fatal.hpp>
#include <stout/foreach.hpp>
#include <stout/path.hpp>
#include <stout/strings.hpp>
#include "zookeeper/zookeeper.hpp"
using boost::tuple;
using process::Future;
using process::PID;
using process::Process;
using process::Promise;
using std::map;
using std::string;
using std::vector;
// Singleton instance of WatcherProcessManager.
class WatcherProcessManager;
WatcherProcessManager* manager;
// In order to make callbacks on Watcher, we create a proxy
// WatcherProcess. The ZooKeeperImpl (defined below) dispatches
// "events" to the WatcherProcess which then invokes
// Watcher::process. The major benefit of this approach is that a
// WatcherProcess lifetime can precisely match the lifetime of a
// Watcher, so the ZooKeeperImpl won't end up calling into an object
// that has been deleted. In the worst case, the ZooKeeperImpl will
// dispatch to a dead WatcherProcess, which will just get dropped on
// the floor. In addition, the callbacks in the Watcher can manipulate
// the ZooKeeper object freely, calling delete on it if necessary
// (e.g., after a session expiration). We wanted to keep the Watcher
// interface clean and simple, so rather than add a member in Watcher
// that points to a WatcherProcess instance (or points to a
// WatcherImpl), we choose to create a WatcherProcessManager that
// stores the Watcher and WatcherProcess associations. The
// WatcherProcessManager is akin to having a shared dictionary or
// hashtable and using locks to access it rather then sending and
// receiving messages. Their is probably a performance hit here, but
// it would be interesting to see how bad the perforamnce is across a
// range of low and high-contention states.
class WatcherProcess : public Process<WatcherProcess>
{
public:
WatcherProcess(Watcher* watcher) : watcher(watcher) {}
void event(ZooKeeper* zk, int type, int state, const string& path)
{
watcher->process(zk, type, state, path);
}
private:
Watcher* watcher;
};
class WatcherProcessManager : public Process<WatcherProcessManager>
{
public:
WatcherProcess* create(Watcher* watcher)
{
WatcherProcess* process = new WatcherProcess(watcher);
spawn(process);
processes[watcher] = process;
return process;
}
bool destroy(Watcher* watcher)
{
if (processes.count(watcher) > 0) {
WatcherProcess* process = processes[watcher];
processes.erase(watcher);
process::terminate(process->self());
process::wait(process->self());
delete process;
return true;
}
return false;
}
PID<WatcherProcess> lookup(Watcher* watcher)
{
if (processes.count(watcher) > 0) {
return processes[watcher]->self();
}
return PID<WatcherProcess>();
}
private:
map<Watcher*, WatcherProcess*> processes;
};
Watcher::Watcher()
{
// Confirm we have created the WatcherProcessManager.
static volatile bool initialized = false;
static volatile bool initializing = true;
// Confirm everything is initialized.
if (!initialized) {
if (__sync_bool_compare_and_swap(&initialized, false, true)) {
manager = new WatcherProcessManager();
process::spawn(manager);
initializing = false;
}
}
while (initializing);
WatcherProcess* process =
process::dispatch(manager->self(),
&WatcherProcessManager::create,
this).get();
if (process == NULL) {
fatal("failed to initialize Watcher");
}
}
Watcher::~Watcher()
{
process::dispatch(manager->self(), &WatcherProcessManager::destroy, this)
.await();
}
class ZooKeeperImpl
{
public:
ZooKeeperImpl(ZooKeeper* zk,
const string& servers,
const Duration& timeout,
Watcher* watcher)
: servers(servers),
timeout(timeout),
zk(zk),
watcher(watcher)
{
if (watcher == NULL) {
LOG(FATAL) << "Cannot instantiate ZooKeeper with NULL watcher";
}
// Lookup PID of the WatcherProcess associated with the Watcher.
pid = process::dispatch(manager->self(),
&WatcherProcessManager::lookup,
watcher).get();
// N.B. The Watcher and thus WatcherProcess may already be gone,
// in which case, each dispatch to the WatcherProcess that we do
// will just get dropped on the floor.
// TODO(benh): Link with WatcherProcess PID?
zh = zookeeper_init(
servers.c_str(),
event,
static_cast<int>(timeout.ms()),
NULL,
this,
0);
if (zh == NULL) {
PLOG(FATAL) << "Failed to create ZooKeeper, zookeeper_init";
}
}
~ZooKeeperImpl()
{
int ret = zookeeper_close(zh);
if (ret != ZOK) {
LOG(FATAL) << "Failed to cleanup ZooKeeper, zookeeper_close: "
<< zerror(ret);
}
}
Future<int> authenticate(const string& scheme, const string& credentials)
{
Promise<int>* promise = new Promise<int>();
Future<int> future = promise->future();
tuple<Promise<int>*>* args = new tuple<Promise<int>*>(promise);
int ret = zoo_add_auth(zh, scheme.c_str(), credentials.data(),
credentials.size(), voidCompletion, args);
if (ret != ZOK) {
delete promise;
delete args;
return ret;
}
return future;
}
Future<int> create(const string& path, const string& data,
const ACL_vector& acl, int flags, string* result)
{
Promise<int>* promise = new Promise<int>();
Future<int> future = promise->future();
tuple<Promise<int>*, string*>* args =
new tuple<Promise<int>*, string*>(promise, result);
int ret = zoo_acreate(zh, path.c_str(), data.data(), data.size(), &acl,
flags, stringCompletion, args);
if (ret != ZOK) {
delete promise;
delete args;
return ret;
}
return future;
}
Future<int> remove(const string& path, int version)
{
Promise<int>* promise = new Promise<int>();
Future<int> future = promise->future();
tuple<Promise<int>*>* args = new tuple<Promise<int>*>(promise);
int ret = zoo_adelete(zh, path.c_str(), version, voidCompletion, args);
if (ret != ZOK) {
delete promise;
delete args;
return ret;
}
return future;
}
Future<int> exists(const string& path, bool watch, Stat* stat)
{
Promise<int>* promise = new Promise<int>();
Future<int> future = promise->future();
tuple<Promise<int>*, Stat*>* args =
new tuple<Promise<int>*, Stat*>(promise, stat);
int ret = zoo_aexists(zh, path.c_str(), watch, statCompletion, args);
if (ret != ZOK) {
delete promise;
delete args;
return ret;
}
return future;
}
Future<int> get(const string& path, bool watch, string* result, Stat* stat)
{
Promise<int>* promise = new Promise<int>();
Future<int> future = promise->future();
tuple<Promise<int>*, string*, Stat*>* args =
new tuple<Promise<int>*, string*, Stat*>(promise, result, stat);
int ret = zoo_aget(zh, path.c_str(), watch, dataCompletion, args);
if (ret != ZOK) {
delete promise;
delete args;
return ret;
}
return future;
}
Future<int> getChildren(const string& path,
bool watch,
vector<string>* results)
{
Promise<int>* promise = new Promise<int>();
Future<int> future = promise->future();
tuple<Promise<int>*, vector<string>*>* args =
new tuple<Promise<int>*, vector<string>*>(promise, results);
int ret = zoo_aget_children(zh, path.c_str(), watch, stringsCompletion,
args);
if (ret != ZOK) {
delete promise;
delete args;
return ret;
}
return future;
}
Future<int> set(const string& path, const string& data, int version)
{
Promise<int>* promise = new Promise<int>();
Future<int> future = promise->future();
tuple<Promise<int>*, Stat*>* args =
new tuple<Promise<int>*, Stat*>(promise, NULL);
int ret = zoo_aset(zh, path.c_str(), data.data(), data.size(),
version, statCompletion, args);
if (ret != ZOK) {
delete promise;
delete args;
return ret;
}
return future;
}
private:
static void event(zhandle_t* zh, int type, int state,
const char* path, void* ctx)
{
ZooKeeperImpl* impl = static_cast<ZooKeeperImpl*>(ctx);
process::dispatch(impl->pid, &WatcherProcess::event,
impl->zk, type, state, string(path));
}
static void voidCompletion(int ret, const void *data)
{
const tuple<Promise<int>*>* args =
reinterpret_cast<const tuple<Promise<int>*>*>(data);
Promise<int>* promise = (*args).get<0>();
promise->set(ret);
delete promise;
delete args;
}
static void stringCompletion(int ret, const char* value, const void* data)
{
const tuple<Promise<int>*, string*> *args =
reinterpret_cast<const tuple<Promise<int>*, string*>*>(data);
Promise<int>* promise = (*args).get<0>();
string* result = (*args).get<1>();
if (ret == 0) {
if (result != NULL) {
result->assign(value);
}
}
promise->set(ret);
delete promise;
delete args;
}
static void statCompletion(int ret, const Stat* stat, const void* data)
{
const tuple<Promise<int>*, Stat*>* args =
reinterpret_cast<const tuple<Promise<int>*, Stat*>*>(data);
Promise<int>* promise = (*args).get<0>();
Stat *stat_result = (*args).get<1>();
if (ret == 0) {
if (stat_result != NULL) {
*stat_result = *stat;
}
}
promise->set(ret);
delete promise;
delete args;
}
static void dataCompletion(int ret, const char* value, int value_len,
const Stat* stat, const void* data)
{
const tuple<Promise<int>*, string*, Stat*>* args =
reinterpret_cast<const tuple<Promise<int>*, string*, Stat*>*>(data);
Promise<int>* promise = (*args).get<0>();
string* result = (*args).get<1>();
Stat* stat_result = (*args).get<2>();
if (ret == 0) {
if (result != NULL) {
result->assign(value, value_len);
}
if (stat_result != NULL) {
*stat_result = *stat;
}
}
promise->set(ret);
delete promise;
delete args;
}
static void stringsCompletion(int ret, const String_vector* values,
const void* data)
{
const tuple<Promise<int>*, vector<string>*>* args =
reinterpret_cast<const tuple<Promise<int>*, vector<string>*>*>(data);
Promise<int>* promise = (*args).get<0>();
vector<string>* results = (*args).get<1>();
if (ret == 0) {
if (results != NULL) {
for (int i = 0; i < values->count; i++) {
results->push_back(values->data[i]);
}
}
}
promise->set(ret);
delete promise;
delete args;
}
private:
friend class ZooKeeper;
const string servers; // ZooKeeper host:port pairs.
const Duration timeout; // ZooKeeper session timeout.
ZooKeeper* zk; // ZooKeeper instance.
zhandle_t* zh; // ZooKeeper connection handle.
Watcher* watcher; // Associated Watcher instance.
PID<WatcherProcess> pid; // PID of WatcherProcess that invokes Watcher.
};
ZooKeeper::ZooKeeper(const string& servers,
const Duration& timeout,
Watcher* watcher)
{
impl = new ZooKeeperImpl(this, servers, timeout, watcher);
}
ZooKeeper::~ZooKeeper()
{
delete impl;
}
int ZooKeeper::getState()
{
return zoo_state(impl->zh);
}
int64_t ZooKeeper::getSessionId()
{
return zoo_client_id(impl->zh)->client_id;
}
int ZooKeeper::authenticate(const string& scheme, const string& credentials)
{
return impl->authenticate(scheme, credentials).get();
}
int ZooKeeper::create(const string& path, const string& data,
const ACL_vector& acl, int flags, string* result,
bool recursive)
{
if (!recursive) {
return impl->create(path, data, acl, flags, result).get();
}
// Do "recursive" create, i.e., ensure intermediate znodes exist.
string prefix = "/";
int code = ZOK;
foreach (const string& token, strings::tokenize(path, "/")) {
prefix = path::join(prefix, token);
// Make sure we include 'flags' and 'data' for the final znode.
if (prefix == path || (prefix + "/") == path) {
code = impl->create(path, data, acl, flags, result).get();
} else {
code = impl->create(prefix, "", acl, 0, result).get();
}
// We fail all non-OK return codes except for:
// ZNODEEXISTS says the node in the znode path we are trying to
// create already exists - this is what we wanted, so we
// continue.
// ZNOAUTH says we can't write the node, but it doesn't tell us
// whether the node already exists. We take the optimistic
// approach and assume the node's parent doesn't allow us to
// write an already existing node (but it exists).
if (code != ZOK && code != ZNODEEXISTS && code != ZNOAUTH) {
return code;
}
}
return code;
}
int ZooKeeper::remove(const string& path, int version)
{
return impl->remove(path, version).get();
}
int ZooKeeper::exists(const string& path, bool watch, Stat* stat)
{
return impl->exists(path, watch, stat).get();
}
int ZooKeeper::get(const string& path, bool watch, string* result, Stat* stat)
{
return impl->get(path, watch, result, stat).get();
}
int ZooKeeper::getChildren(const string& path, bool watch,
vector<string>* results)
{
return impl->getChildren(path, watch, results).get();
}
int ZooKeeper::set(const string& path, const string& data, int version)
{
return impl->set(path, data, version).get();
}
string ZooKeeper::message(int code) const
{
return string(zerror(code));
}
bool ZooKeeper::retryable(int code)
{
switch (code) {
case ZCONNECTIONLOSS:
case ZOPERATIONTIMEOUT:
case ZSESSIONEXPIRED:
case ZSESSIONMOVED:
return true;
case ZOK: // No need to retry!
case ZSYSTEMERROR: // Should not be encountered, here for completeness.
case ZRUNTIMEINCONSISTENCY:
case ZDATAINCONSISTENCY:
case ZMARSHALLINGERROR:
case ZUNIMPLEMENTED:
case ZBADARGUMENTS:
case ZINVALIDSTATE:
case ZAPIERROR: // Should not be encountered, here for completeness.
case ZNONODE:
case ZNOAUTH:
case ZBADVERSION:
case ZNOCHILDRENFOREPHEMERALS:
case ZNODEEXISTS:
case ZNOTEMPTY:
case ZINVALIDCALLBACK:
case ZINVALIDACL:
case ZAUTHFAILED:
case ZCLOSING:
case ZNOTHING: // Is this used? It's not exposed in the Java API.
return false;
default:
LOG(FATAL) << "Unknown ZooKeeper code: " << code;
}
}