blob: 80e4c928a515d86b33aa1e87bf89cc286594c57d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <cppunit/extensions/HelperMacros.h>
#include "CppAssertHelper.h"
#include <signal.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/select.h>
#include "CollectionUtil.h"
#include "ThreadingUtil.h"
using namespace Util;
#include "Vector.h"
using namespace std;
#include <cstring>
#include <list>
#include <zookeeper.h>
#include <errno.h>
#include <recordio.h>
#include "Util.h"
#include "ZKMocks.h"
struct buff_struct_2 {
int32_t len;
int32_t off;
char *buffer;
};
// TODO(br33d): the vast majority of this test is not usable with single threaded.
// it needs a overhaul to work properly with both threaded and single
// threaded (ZOOKEEPER-2640)
#ifdef THREADED
// For testing LogMessage Callback functionality
list<string> logMessages;
void logMessageHandler(const char* message) {
cout << "Log Message Received: [" << message << "]" << endl;
logMessages.push_back(message);
}
static int Stat_eq(struct Stat* a, struct Stat* b)
{
if (a->czxid != b->czxid) return 0;
if (a->mzxid != b->mzxid) return 0;
if (a->ctime != b->ctime) return 0;
if (a->mtime != b->mtime) return 0;
if (a->version != b->version) return 0;
if (a->cversion != b->cversion) return 0;
if (a->aversion != b->aversion) return 0;
if (a->ephemeralOwner != b->ephemeralOwner) return 0;
if (a->dataLength != b->dataLength) return 0;
if (a->numChildren != b->numChildren) return 0;
if (a->pzxid != b->pzxid) return 0;
return 1;
}
#ifdef THREADED
static void yield(zhandle_t *zh, int i)
{
sleep(i);
}
#else
static void yield(zhandle_t *zh, int seconds)
{
int fd;
int interest;
int events;
struct timeval tv;
int rc;
time_t expires = time(0) + seconds;
time_t timeLeft = seconds;
fd_set rfds, wfds, efds;
FD_ZERO(&rfds);
FD_ZERO(&wfds);
FD_ZERO(&efds);
while(timeLeft >= 0) {
zookeeper_interest(zh, &fd, &interest, &tv);
if (fd != -1) {
if (interest&ZOOKEEPER_READ) {
FD_SET(fd, &rfds);
} else {
FD_CLR(fd, &rfds);
}
if (interest&ZOOKEEPER_WRITE) {
FD_SET(fd, &wfds);
} else {
FD_CLR(fd, &wfds);
}
} else {
fd = 0;
}
FD_SET(0, &rfds);
if (tv.tv_sec > timeLeft) {
tv.tv_sec = timeLeft;
}
rc = select(fd+1, &rfds, &wfds, &efds, &tv);
timeLeft = expires - time(0);
events = 0;
if (FD_ISSET(fd, &rfds)) {
events |= ZOOKEEPER_READ;
}
if (FD_ISSET(fd, &wfds)) {
events |= ZOOKEEPER_WRITE;
}
zookeeper_process(zh, events);
}
}
#endif
typedef struct evt {
string path;
int type;
} evt_t;
typedef struct watchCtx {
private:
list<evt_t> events;
watchCtx(const watchCtx&);
watchCtx& operator=(const watchCtx&);
public:
bool connected;
zhandle_t *zh;
Mutex mutex;
watchCtx() {
connected = false;
zh = 0;
}
~watchCtx() {
if (zh) {
zookeeper_close(zh);
zh = 0;
}
}
evt_t getEvent() {
evt_t evt;
mutex.acquire();
CPPUNIT_ASSERT( events.size() > 0);
evt = events.front();
events.pop_front();
mutex.release();
return evt;
}
int countEvents() {
int count;
mutex.acquire();
count = events.size();
mutex.release();
return count;
}
void putEvent(evt_t evt) {
mutex.acquire();
events.push_back(evt);
mutex.release();
}
bool waitForConnected(zhandle_t *zh) {
time_t expires = time(0) + 10;
while(!connected && time(0) < expires) {
yield(zh, 1);
}
return connected;
}
bool waitForDisconnected(zhandle_t *zh) {
time_t expires = time(0) + 15;
while(connected && time(0) < expires) {
yield(zh, 1);
}
return !connected;
}
} watchctx_t;
class Zookeeper_simpleSystem : public CPPUNIT_NS::TestFixture
{
CPPUNIT_TEST_SUITE(Zookeeper_simpleSystem);
CPPUNIT_TEST(testLogCallbackSet);
CPPUNIT_TEST(testLogCallbackInit);
CPPUNIT_TEST(testLogCallbackClear);
CPPUNIT_TEST(testAsyncWatcherAutoReset);
CPPUNIT_TEST(testDeserializeString);
CPPUNIT_TEST(testFirstServerDown);
CPPUNIT_TEST(testNonexistentHost);
#ifdef THREADED
CPPUNIT_TEST(testNullData);
#ifdef ZOO_IPV6_ENABLED
CPPUNIT_TEST(testIPV6);
#endif
#ifdef HAVE_OPENSSL_H
CPPUNIT_TEST(testSSL);
#endif
CPPUNIT_TEST(testCreate);
CPPUNIT_TEST(testCreateContainer);
CPPUNIT_TEST(testCreateTtl);
CPPUNIT_TEST(testPath);
CPPUNIT_TEST(testPathValidation);
CPPUNIT_TEST(testPing);
CPPUNIT_TEST(testAcl);
CPPUNIT_TEST(testChroot);
CPPUNIT_TEST(testAuth);
CPPUNIT_TEST(testHangingClient);
CPPUNIT_TEST(testWatcherAutoResetWithGlobal);
CPPUNIT_TEST(testWatcherAutoResetWithLocal);
CPPUNIT_TEST(testGetChildren2);
CPPUNIT_TEST(testLastZxid);
CPPUNIT_TEST(testServersResolutionDelay);
CPPUNIT_TEST(testRemoveWatchers);
#endif
CPPUNIT_TEST_SUITE_END();
static void watcher(zhandle_t *, int type, int state, const char *path,void*v){
watchctx_t *ctx = (watchctx_t*)v;
if (state == ZOO_CONNECTED_STATE) {
ctx->connected = true;
} else {
ctx->connected = false;
}
if (type != ZOO_SESSION_EVENT) {
evt_t evt;
evt.path = path;
evt.type = type;
ctx->putEvent(evt);
}
}
static const char hostPorts[];
const char *getHostPorts() {
return hostPorts;
}
zhandle_t *createClient(watchctx_t *ctx) {
return createClient(hostPorts, ctx);
}
zhandle_t *createClient(watchctx_t *ctx, log_callback_fn logCallback) {
zhandle_t *zk = zookeeper_init2(hostPorts, watcher, 10000, 0, ctx, 0, logCallback);
ctx->zh = zk;
sleep(1);
return zk;
}
zhandle_t *createClient(const char *hp, watchctx_t *ctx) {
zhandle_t *zk = zookeeper_init(hp, watcher, 10000, 0, ctx, 0);
ctx->zh = zk;
sleep(1);
return zk;
}
#ifdef HAVE_OPENSSL_H
zhandle_t *createSSLClient(const char *hp, const char *cert, watchctx_t *ctx) {
zhandle_t *zk = zookeeper_init_ssl(hp, cert, watcher, 30000, 0, ctx, 0);
ctx->zh = zk;
sleep(1);
return zk;
}
#endif
zhandle_t *createchClient(watchctx_t *ctx, const char* chroot) {
zhandle_t *zk = zookeeper_init(chroot, watcher, 10000, 0, ctx, 0);
ctx->zh = zk;
sleep(1);
return zk;
}
FILE *logfile;
public:
Zookeeper_simpleSystem() {
logfile = openlogfile("Zookeeper_simpleSystem");
}
~Zookeeper_simpleSystem() {
if (logfile) {
fflush(logfile);
fclose(logfile);
logfile = 0;
}
}
void setUp()
{
zoo_set_log_stream(logfile);
}
void startServer() {
char cmd[1024];
sprintf(cmd, "%s start %s", ZKSERVER_CMD, getHostPorts());
CPPUNIT_ASSERT(system(cmd) == 0);
}
void stopServer() {
char cmd[1024];
sprintf(cmd, "%s stop %s", ZKSERVER_CMD, getHostPorts());
CPPUNIT_ASSERT(system(cmd) == 0);
}
void tearDown()
{
}
/** have a callback in the default watcher **/
static void default_zoo_watcher(zhandle_t *zzh, int type, int state, const char *path, void *context){
int zrc;
struct String_vector str_vec = {0, NULL};
zrc = zoo_wget_children(zzh, "/mytest", default_zoo_watcher, NULL, &str_vec);
CPPUNIT_ASSERT(zrc == ZOK || zrc == ZCLOSING);
}
/** ZOOKEEPER-1057 This checks that the client connects to the second server when the first is not reachable **/
void testFirstServerDown() {
watchctx_t ctx;
zoo_deterministic_conn_order(true);
zhandle_t* zk = createClient("127.0.0.1:22182,127.0.0.1:22181", &ctx);
CPPUNIT_ASSERT(zk != 0);
CPPUNIT_ASSERT(ctx.waitForConnected(zk));
}
/* Checks that a non-existent host will not block the connection*/
void testNonexistentHost() {
char hosts[] = "jimmy:5555,127.0.0.1:22181";
watchctx_t ctx;
zoo_deterministic_conn_order(true /* disable permute */);
zhandle_t *zh = createClient(hosts, &ctx);
CPPUNIT_ASSERT(ctx.waitForConnected(zh));
zoo_deterministic_conn_order(false /* enable permute */);
}
/** this checks for a deadlock in calling zookeeper_close and calls from a default watcher that might get triggered just when zookeeper_close() is in progress **/
void testHangingClient() {
int zrc;
char buff[10] = "testall";
char path[512];
watchctx_t *ctx = NULL;
struct String_vector str_vec = {0, NULL};
zhandle_t *zh = zookeeper_init(hostPorts, NULL, 10000, 0, ctx, 0);
sleep(1);
zrc = zoo_create(zh, "/mytest", buff, 10, &ZOO_OPEN_ACL_UNSAFE, 0, path, 512);
CPPUNIT_ASSERT_EQUAL((int)ZOK, zrc);
zrc = zoo_wget_children(zh, "/mytest", default_zoo_watcher, NULL, &str_vec);
CPPUNIT_ASSERT_EQUAL((int)ZOK, zrc);
zrc = zoo_create(zh, "/mytest/test1", buff, 10, &ZOO_OPEN_ACL_UNSAFE, 0, path, 512);
CPPUNIT_ASSERT_EQUAL((int)ZOK, zrc);
zrc = zoo_wget_children(zh, "/mytest", default_zoo_watcher, NULL, &str_vec);
CPPUNIT_ASSERT_EQUAL((int)ZOK, zrc);
zrc = zoo_delete(zh, "/mytest/test1", -1);
CPPUNIT_ASSERT_EQUAL((int)ZOK, zrc);
zookeeper_close(zh);
}
void testBadDescriptor() {
watchctx_t *ctx = NULL;
zhandle_t *zh = zookeeper_init(hostPorts, NULL, 10000, 0, ctx, 0);
sleep(1);
zh->io_count = 0;
//close socket
close_zsock(zh->fd);
sleep(1);
//Check that doIo isn't spinning
CPPUNIT_ASSERT(zh->io_count < 2);
zookeeper_close(zh);
}
/* Checks the zoo_set_servers_resolution_delay default and operation */
void testServersResolutionDelay() {
watchctx_t ctx;
zhandle_t *zk = createClient(&ctx);
int rc;
struct timeval tv;
struct Stat stat;
CPPUNIT_ASSERT(zk);
CPPUNIT_ASSERT(zk->resolve_delay_ms == 0);
// a) Default/0 case: resolve at each request.
tv = zk->last_resolve;
usleep(10000); // 10ms
rc = zoo_exists(zk, "/", 0, &stat);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
// Must have changed because of the request.
CPPUNIT_ASSERT(zk->last_resolve.tv_sec != tv.tv_sec ||
zk->last_resolve.tv_usec != tv.tv_usec);
// b) Disabled/-1 case: never perform "routine" resolutions.
rc = zoo_set_servers_resolution_delay(zk, -1); // Disabled
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
tv = zk->last_resolve;
usleep(10000); // 10ms
rc = zoo_exists(zk, "/", 0, &stat);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
// Must not have changed as auto-resolution is disabled.
CPPUNIT_ASSERT(zk->last_resolve.tv_sec == tv.tv_sec &&
zk->last_resolve.tv_usec == tv.tv_usec);
// c) Invalid delay is rejected.
rc = zoo_set_servers_resolution_delay(zk, -1000); // Bad
CPPUNIT_ASSERT_EQUAL((int)ZBADARGUMENTS, rc);
// d) Valid delay, no resolution within window.
rc = zoo_set_servers_resolution_delay(zk, 500); // 0.5s
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
tv = zk->last_resolve;
usleep(10000); // 10ms
rc = zoo_exists(zk, "/", 0, &stat);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
// Must not have changed because the request (hopefully!)
// executed in less than 0.5s.
CPPUNIT_ASSERT(zk->last_resolve.tv_sec == tv.tv_sec &&
zk->last_resolve.tv_usec == tv.tv_usec);
// e) Valid delay, at least one resolution after delay.
usleep(500 * 1000); // 0.5s
rc = zoo_exists(zk, "/", 0, &stat);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
// Must have changed because we waited 0.5s between the
// capture and the last request.
CPPUNIT_ASSERT(zk->last_resolve.tv_sec != tv.tv_sec ||
zk->last_resolve.tv_usec != tv.tv_usec);
}
void testPing()
{
watchctx_t ctxIdle;
watchctx_t ctxWC;
zhandle_t *zkIdle = createClient(&ctxIdle);
zhandle_t *zkWatchCreator = createClient(&ctxWC);
CPPUNIT_ASSERT(zkIdle);
CPPUNIT_ASSERT(zkWatchCreator);
char path[80];
sprintf(path, "/testping");
int rc = zoo_create(zkWatchCreator, path, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
for(int i = 0; i < 30; i++) {
sprintf(path, "/testping/%i", i);
rc = zoo_create(zkWatchCreator, path, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
}
for(int i = 0; i < 30; i++) {
sprintf(path, "/testping/%i", i);
struct Stat stat;
rc = zoo_exists(zkIdle, path, 1, &stat);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
}
for(int i = 0; i < 30; i++) {
sprintf(path, "/testping/%i", i);
usleep(500000);
rc = zoo_delete(zkWatchCreator, path, -1);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
}
struct Stat stat;
CPPUNIT_ASSERT_EQUAL((int)ZNONODE, zoo_exists(zkIdle, "/testping/0", 0, &stat));
}
bool waitForEvent(zhandle_t *zh, watchctx_t *ctx, int seconds) {
time_t expires = time(0) + seconds;
while(ctx->countEvents() == 0 && time(0) < expires) {
yield(zh, 1);
}
return ctx->countEvents() > 0;
}
#define COUNT 100
static zhandle_t *async_zk;
static volatile int count;
static const char* hp_chroot;
static void statCompletion(int rc, const struct Stat *stat, const void *data) {
int tmp = (int) (long) data;
CPPUNIT_ASSERT_EQUAL(tmp, rc);
}
static void stringCompletion(int rc, const char *value, const void *data) {
char *path = (char*)data;
if (rc == ZCONNECTIONLOSS && path) {
// Try again
rc = zoo_acreate(async_zk, path, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, stringCompletion, 0);
} else if (rc != ZOK) {
// fprintf(stderr, "rc = %d with path = %s\n", rc, (path ? path : "null"));
}
if (path) {
free(path);
}
}
static void stringStatCompletion(int rc, const char *value, const struct Stat *stat,
const void *data) {
stringCompletion(rc, value, data);
CPPUNIT_ASSERT(stat != 0);
}
static void create_completion_fn(int rc, const char* value, const void *data) {
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
if (data) {
const char *expected_value = (const char *)data;
CPPUNIT_ASSERT_EQUAL(string(expected_value), string(value));
}
count++;
}
static void waitForCreateCompletion(int seconds) {
time_t expires = time(0) + seconds;
while(count == 0 && time(0) < expires) {
sleep(1);
}
count--;
}
static void watcher_chroot_fn(zhandle_t *zh, int type,
int state, const char *path,void *watcherCtx) {
// check for path
char *client_path = (char *) watcherCtx;
CPPUNIT_ASSERT(strcmp(client_path, path) == 0);
count ++;
}
static void waitForChrootWatch(int seconds) {
time_t expires = time(0) + seconds;
while (count == 0 && time(0) < expires) {
sleep(1);
}
count--;
}
static void waitForVoidCompletion(int seconds) {
time_t expires = time(0) + seconds;
while(count == 0 && time(0) < expires) {
sleep(1);
}
count--;
}
static void voidCompletion(int rc, const void *data) {
int tmp = (int) (long) data;
CPPUNIT_ASSERT_EQUAL(tmp, rc);
count++;
}
static void verifyCreateFails(const char *path, zhandle_t *zk) {
CPPUNIT_ASSERT_EQUAL((int)ZBADARGUMENTS, zoo_create(zk,
path, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0));
}
static void verifyCreateOk(const char *path, zhandle_t *zk) {
CPPUNIT_ASSERT_EQUAL((int)ZOK, zoo_create(zk,
path, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0));
}
static void verifyCreateFailsSeq(const char *path, zhandle_t *zk) {
CPPUNIT_ASSERT_EQUAL((int)ZBADARGUMENTS, zoo_create(zk,
path, "", 0, &ZOO_OPEN_ACL_UNSAFE, ZOO_SEQUENCE, 0, 0));
}
static void verifyCreateOkSeq(const char *path, zhandle_t *zk) {
CPPUNIT_ASSERT_EQUAL((int)ZOK, zoo_create(zk,
path, "", 0, &ZOO_OPEN_ACL_UNSAFE, ZOO_SEQUENCE, 0, 0));
}
/**
returns false if the vectors dont match
**/
bool compareAcl(struct ACL_vector acl1, struct ACL_vector acl2) {
if (acl1.count != acl2.count) {
return false;
}
struct ACL *aclval1 = acl1.data;
struct ACL *aclval2 = acl2.data;
if (aclval1->perms != aclval2->perms) {
return false;
}
struct Id id1 = aclval1->id;
struct Id id2 = aclval2->id;
if (strcmp(id1.scheme, id2.scheme) != 0) {
return false;
}
if (strcmp(id1.id, id2.id) != 0) {
return false;
}
return true;
}
void testDeserializeString() {
char *val_str;
int rc = 0;
int val = -1;
struct iarchive *ia;
struct buff_struct_2 *b;
struct oarchive *oa = create_buffer_oarchive();
oa->serialize_Int(oa, "int", &val);
b = (struct buff_struct_2 *) oa->priv;
ia = create_buffer_iarchive(b->buffer, b->len);
rc = ia->deserialize_String(ia, "string", &val_str);
CPPUNIT_ASSERT_EQUAL(-EINVAL, rc);
}
void testAcl() {
int rc;
struct ACL_vector aclvec;
struct Stat stat;
watchctx_t ctx;
zhandle_t *zk = createClient(&ctx);
rc = zoo_create(zk, "/acl", "", 0,
&ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
rc = zoo_get_acl(zk, "/acl", &aclvec, &stat );
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
bool cmp = compareAcl(ZOO_OPEN_ACL_UNSAFE, aclvec);
CPPUNIT_ASSERT_EQUAL(true, cmp);
rc = zoo_set_acl(zk, "/acl", -1, &ZOO_READ_ACL_UNSAFE);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
rc = zoo_get_acl(zk, "/acl", &aclvec, &stat);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
cmp = compareAcl(ZOO_READ_ACL_UNSAFE, aclvec);
CPPUNIT_ASSERT_EQUAL(true, cmp);
}
void testAuth() {
int rc;
count = 0;
watchctx_t ctx1, ctx2, ctx3, ctx4, ctx5;
zhandle_t *zk = createClient(&ctx1);
struct ACL_vector nodeAcl;
struct ACL acl_val;
rc = zoo_add_auth(0, "", 0, 0, voidCompletion, (void*)-1);
CPPUNIT_ASSERT_EQUAL((int) ZBADARGUMENTS, rc);
rc = zoo_add_auth(zk, 0, 0, 0, voidCompletion, (void*)-1);
CPPUNIT_ASSERT_EQUAL((int) ZBADARGUMENTS, rc);
// auth as pat, create /tauth1, close session
rc = zoo_add_auth(zk, "digest", "pat:passwd", 10, voidCompletion,
(void*)ZOK);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
waitForVoidCompletion(3);
CPPUNIT_ASSERT(count == 0);
rc = zoo_create(zk, "/tauth1", "", 0, &ZOO_CREATOR_ALL_ACL, 0, 0, 0);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
{
//create a new client
zk = createClient(&ctx4);
rc = zoo_add_auth(zk, "digest", "", 0, voidCompletion, (void*)ZOK);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
waitForVoidCompletion(3);
CPPUNIT_ASSERT(count == 0);
rc = zoo_add_auth(zk, "digest", "", 0, voidCompletion, (void*)ZOK);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
waitForVoidCompletion(3);
CPPUNIT_ASSERT(count == 0);
}
//create a new client
zk = createClient(&ctx2);
rc = zoo_add_auth(zk, "digest", "pat:passwd2", 11, voidCompletion,
(void*)ZOK);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
waitForVoidCompletion(3);
CPPUNIT_ASSERT(count == 0);
char buf[1024];
int blen = sizeof(buf);
struct Stat stat;
rc = zoo_get(zk, "/tauth1", 0, buf, &blen, &stat);
CPPUNIT_ASSERT_EQUAL((int)ZNOAUTH, rc);
// add auth pat w/correct pass verify success
rc = zoo_add_auth(zk, "digest", "pat:passwd", 10, voidCompletion,
(void*)ZOK);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
rc = zoo_get(zk, "/tauth1", 0, buf, &blen, &stat);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
waitForVoidCompletion(3);
CPPUNIT_ASSERT(count == 0);
//create a new client
zk = createClient(&ctx3);
rc = zoo_add_auth(zk, "digest", "pat:passwd", 10, voidCompletion, (void*) ZOK);
waitForVoidCompletion(3);
CPPUNIT_ASSERT(count == 0);
rc = zoo_add_auth(zk, "ip", "none", 4, voidCompletion, (void*)ZOK);
//make the server forget the auths
waitForVoidCompletion(3);
CPPUNIT_ASSERT(count == 0);
stopServer();
CPPUNIT_ASSERT(ctx3.waitForDisconnected(zk));
startServer();
CPPUNIT_ASSERT(ctx3.waitForConnected(zk));
// now try getting the data
rc = zoo_get(zk, "/tauth1", 0, buf, &blen, &stat);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
// also check for get
rc = zoo_get_acl(zk, "/", &nodeAcl, &stat);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
// check if the acl has all the perms
CPPUNIT_ASSERT_EQUAL((int)1, (int)nodeAcl.count);
acl_val = *(nodeAcl.data);
CPPUNIT_ASSERT_EQUAL((int) acl_val.perms, ZOO_PERM_ALL);
// verify on root node
rc = zoo_set_acl(zk, "/", -1, &ZOO_CREATOR_ALL_ACL);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
rc = zoo_set_acl(zk, "/", -1, &ZOO_OPEN_ACL_UNSAFE);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
//[ZOOKEEPER-1108], test that auth info is sent to server, if client is not
//connected to server when zoo_add_auth was called.
zhandle_t *zk_auth = zookeeper_init(hostPorts, NULL, 10000, 0, NULL, 0);
rc = zoo_add_auth(zk_auth, "digest", "pat:passwd", 10, voidCompletion, (void*)ZOK);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
sleep(2);
CPPUNIT_ASSERT(count == 1);
count = 0;
CPPUNIT_ASSERT_EQUAL((int) ZOK, zookeeper_close(zk_auth));
struct sockaddr addr;
socklen_t addr_len = sizeof(addr);
zk = createClient(&ctx5);
stopServer();
CPPUNIT_ASSERT(ctx5.waitForDisconnected(zk));
CPPUNIT_ASSERT(zookeeper_get_connected_host(zk, &addr, &addr_len) == NULL);
addr_len = sizeof(addr);
startServer();
CPPUNIT_ASSERT(ctx5.waitForConnected(zk));
CPPUNIT_ASSERT(zookeeper_get_connected_host(zk, &addr, &addr_len) != NULL);
}
void testCreate() {
watchctx_t ctx;
int rc = 0;
zhandle_t *zk = createClient(&ctx);
CPPUNIT_ASSERT(zk);
char pathbuf[80];
struct Stat stat_a = {0};
struct Stat stat_b = {0};
rc = zoo_create2(zk, "/testcreateA", "", 0,
&ZOO_OPEN_ACL_UNSAFE, 0, pathbuf, sizeof(pathbuf), &stat_a);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
CPPUNIT_ASSERT(strcmp(pathbuf, "/testcreateA") == 0);
CPPUNIT_ASSERT(stat_a.czxid > 0);
CPPUNIT_ASSERT(stat_a.mtime > 0);
rc = zoo_create2(zk, "/testcreateB", "", 0,
&ZOO_OPEN_ACL_UNSAFE, 0, pathbuf, sizeof(pathbuf), &stat_b);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
CPPUNIT_ASSERT(strcmp(pathbuf, "/testcreateB") == 0);
CPPUNIT_ASSERT(stat_b.czxid > 0);
CPPUNIT_ASSERT(stat_b.mtime > 0);
// Should get different Stats back from different creates
CPPUNIT_ASSERT(Stat_eq(&stat_a, &stat_b) != 1);
}
void testCreateContainer() {
watchctx_t ctx;
int rc = 0;
zhandle_t *zk = createClient(&ctx);
CPPUNIT_ASSERT(zk);
char pathbuf[80];
struct Stat stat = {0};
rc = zoo_create2(zk, "/testContainer", "", 0, &ZOO_OPEN_ACL_UNSAFE,
ZOO_CONTAINER, pathbuf, sizeof(pathbuf), &stat);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
}
void testCreateTtl() {
watchctx_t ctx;
int rc = 0;
zhandle_t *zk = createClient(&ctx);
CPPUNIT_ASSERT(zk);
char pathbuf[80];
struct Stat stat = {0};
rc = zoo_create2_ttl(zk, "/testTtl", "", 0, &ZOO_OPEN_ACL_UNSAFE,
ZOO_PERSISTENT_WITH_TTL, 1, pathbuf, sizeof(pathbuf), &stat);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
sleep(1);
rc = zoo_exists(zk, "/testTtl", 1, &stat);
CPPUNIT_ASSERT_EQUAL((int) ZNONODE, rc);
}
void testGetChildren2() {
int rc;
watchctx_t ctx;
zhandle_t *zk = createClient(&ctx);
rc = zoo_create(zk, "/parent", "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
rc = zoo_create(zk, "/parent/child_a", "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
rc = zoo_create(zk, "/parent/child_b", "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
rc = zoo_create(zk, "/parent/child_c", "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
rc = zoo_create(zk, "/parent/child_d", "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
struct String_vector strings;
struct Stat stat_a, stat_b;
rc = zoo_get_children2(zk, "/parent", 0, &strings, &stat_a);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
rc = zoo_exists(zk, "/parent", 0, &stat_b);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
CPPUNIT_ASSERT(Stat_eq(&stat_a, &stat_b));
CPPUNIT_ASSERT(stat_a.numChildren == 4);
}
void testIPV6() {
watchctx_t ctx;
zhandle_t *zk = createClient("::1:22181", &ctx);
CPPUNIT_ASSERT(zk);
int rc = 0;
rc = zoo_create(zk, "/ipv6", NULL, -1,
&ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
}
#ifdef HAVE_OPENSSL_H
void testSSL() {
watchctx_t ctx;
zoo_set_debug_level(ZOO_LOG_LEVEL_DEBUG);
zhandle_t *zk = createSSLClient("127.0.0.1:22281", "/tmp/certs/server.crt,/tmp/certs/client.crt,/tmp/certs/clientkey.pem,password", &ctx);
CPPUNIT_ASSERT(zk);
int rc = 0;
rc = zoo_create(zk, "/ssl", NULL, -1,
&ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
}
#endif
void testNullData() {
watchctx_t ctx;
zhandle_t *zk = createClient(&ctx);
CPPUNIT_ASSERT(zk);
int rc = 0;
rc = zoo_create(zk, "/mahadev", NULL, -1,
&ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
char buffer[512];
struct Stat stat;
int len = 512;
rc = zoo_wget(zk, "/mahadev", NULL, NULL, buffer, &len, &stat);
CPPUNIT_ASSERT_EQUAL( -1, len);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
rc = zoo_set(zk, "/mahadev", NULL, -1, -1);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
rc = zoo_wget(zk, "/mahadev", NULL, NULL, buffer, &len, &stat);
CPPUNIT_ASSERT_EQUAL( -1, len);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
}
void testPath() {
watchctx_t ctx;
char pathbuf[20];
zhandle_t *zk = createClient(&ctx);
CPPUNIT_ASSERT(zk);
int rc = 0;
memset(pathbuf, 'X', 20);
rc = zoo_create(zk, "/testpathpath0", "", 0,
&ZOO_OPEN_ACL_UNSAFE, 0, pathbuf, 0);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
CPPUNIT_ASSERT_EQUAL('X', pathbuf[0]);
rc = zoo_create(zk, "/testpathpath1", "", 0,
&ZOO_OPEN_ACL_UNSAFE, 0, pathbuf, 1);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
CPPUNIT_ASSERT(strlen(pathbuf) == 0);
rc = zoo_create(zk, "/testpathpath2", "", 0,
&ZOO_OPEN_ACL_UNSAFE, 0, pathbuf, 2);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
CPPUNIT_ASSERT(strcmp(pathbuf, "/") == 0);
rc = zoo_create(zk, "/testpathpath3", "", 0,
&ZOO_OPEN_ACL_UNSAFE, 0, pathbuf, 3);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
CPPUNIT_ASSERT(strcmp(pathbuf, "/t") == 0);
rc = zoo_create(zk, "/testpathpath7", "", 0,
&ZOO_OPEN_ACL_UNSAFE, 0, pathbuf, 15);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
CPPUNIT_ASSERT(strcmp(pathbuf, "/testpathpath7") == 0);
rc = zoo_create(zk, "/testpathpath8", "", 0,
&ZOO_OPEN_ACL_UNSAFE, 0, pathbuf, 16);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
CPPUNIT_ASSERT(strcmp(pathbuf, "/testpathpath8") == 0);
}
void testPathValidation() {
watchctx_t ctx;
zhandle_t *zk = createClient(&ctx);
CPPUNIT_ASSERT(zk);
verifyCreateFails(0, zk);
verifyCreateFails("", zk);
verifyCreateFails("//", zk);
verifyCreateFails("///", zk);
verifyCreateFails("////", zk);
verifyCreateFails("/.", zk);
verifyCreateFails("/..", zk);
verifyCreateFails("/./", zk);
verifyCreateFails("/../", zk);
verifyCreateFails("/foo/./", zk);
verifyCreateFails("/foo/../", zk);
verifyCreateFails("/foo/.", zk);
verifyCreateFails("/foo/..", zk);
verifyCreateFails("/./.", zk);
verifyCreateFails("/../..", zk);
verifyCreateFails("/foo/bar/", zk);
verifyCreateFails("/foo//bar", zk);
verifyCreateFails("/foo/bar//", zk);
verifyCreateFails("foo", zk);
verifyCreateFails("a", zk);
// verify that trailing fails, except for seq which adds suffix
verifyCreateOk("/createseq", zk);
verifyCreateFails("/createseq/", zk);
verifyCreateOkSeq("/createseq/", zk);
verifyCreateOkSeq("/createseq/.", zk);
verifyCreateOkSeq("/createseq/..", zk);
verifyCreateFailsSeq("/createseq//", zk);
verifyCreateFailsSeq("/createseq/./", zk);
verifyCreateFailsSeq("/createseq/../", zk);
verifyCreateOk("/.foo", zk);
verifyCreateOk("/.f.", zk);
verifyCreateOk("/..f", zk);
verifyCreateOk("/..f..", zk);
verifyCreateOk("/f.c", zk);
verifyCreateOk("/f", zk);
verifyCreateOk("/f/.f", zk);
verifyCreateOk("/f/f.", zk);
verifyCreateOk("/f/..f", zk);
verifyCreateOk("/f/f..", zk);
verifyCreateOk("/f/.f/f", zk);
verifyCreateOk("/f/f./f", zk);
}
void testChroot() {
// the c client async callbacks do
// not callback with the path, so
// we dont need to test taht for now
// we should fix that though soon!
watchctx_t ctx, ctx_ch;
zhandle_t *zk, *zk_ch;
char buf[60];
int rc, len;
struct Stat stat;
const char* data = "garbage";
const char* retStr = "/chroot";
const char* root= "/";
zk_ch = createchClient(&ctx_ch, "127.0.0.1:22181/testch1/mahadev");
CPPUNIT_ASSERT(zk_ch != NULL);
zk = createClient(&ctx);
// first test with a NULL zk handle, make sure client library does not
// dereference a null pointer, but instead returns ZBADARGUMENTS
rc = zoo_create(NULL, "/testch1", "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
CPPUNIT_ASSERT_EQUAL((int) ZBADARGUMENTS, rc);
rc = zoo_create(zk, "/testch1", "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
rc = zoo_create(zk, "/testch1/mahadev", data, 7, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
// try an exists with /
len = 60;
rc = zoo_get(zk_ch, "/", 0, buf, &len, &stat);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
//check if the data is the same
CPPUNIT_ASSERT(strncmp(buf, data, 7) == 0);
//check for watches
rc = zoo_wexists(zk_ch, "/chroot", watcher_chroot_fn, (void *) retStr, &stat);
//now check if we can do create/delete/get/sets/acls/getChildren and others
//check create
rc = zoo_create(zk_ch, "/chroot", "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, 0,0);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
waitForChrootWatch(3);
CPPUNIT_ASSERT(count == 0);
rc = zoo_create(zk_ch, "/chroot/child", "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
rc = zoo_exists(zk, "/testch1/mahadev/chroot/child", 0, &stat);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
rc = zoo_delete(zk_ch, "/chroot/child", -1);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
rc = zoo_exists(zk, "/testch1/mahadev/chroot/child", 0, &stat);
CPPUNIT_ASSERT_EQUAL((int) ZNONODE, rc);
rc = zoo_wget(zk_ch, "/chroot", watcher_chroot_fn, (char*) retStr,
buf, &len, &stat);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
rc = zoo_set(zk_ch, "/chroot",buf, 3, -1);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
waitForChrootWatch(3);
CPPUNIT_ASSERT(count == 0);
// check for getchildren
struct String_vector children;
rc = zoo_get_children(zk_ch, "/", 0, &children);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
CPPUNIT_ASSERT_EQUAL((int)1, (int)children.count);
//check if te child if chroot
CPPUNIT_ASSERT(strcmp((retStr+1), children.data[0]) == 0);
// check for get/set acl
struct ACL_vector acl;
rc = zoo_get_acl(zk_ch, "/", &acl, &stat);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
CPPUNIT_ASSERT_EQUAL((int)1, (int)acl.count);
CPPUNIT_ASSERT_EQUAL((int)ZOO_PERM_ALL, (int)acl.data->perms);
// set acl
rc = zoo_set_acl(zk_ch, "/chroot", -1, &ZOO_READ_ACL_UNSAFE);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
// see if you add children
rc = zoo_create(zk_ch, "/chroot/child1", "",0, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
CPPUNIT_ASSERT_EQUAL((int)ZNOAUTH, rc);
//add wget children test
rc = zoo_wget_children(zk_ch, "/", watcher_chroot_fn, (char*) root, &children);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
//now create a node
rc = zoo_create(zk_ch, "/child2", "",0, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
waitForChrootWatch(3);
CPPUNIT_ASSERT(count == 0);
//check for one async call just to make sure
rc = zoo_acreate(zk_ch, "/child3", "", 0, &ZOO_OPEN_ACL_UNSAFE, 0,
create_completion_fn, 0);
waitForCreateCompletion(3);
CPPUNIT_ASSERT(count == 0);
//ZOOKEEPER-1027 correctly return path_buffer without prefixed chroot
const char* path = "/zookeeper1027";
char path_buffer[1024];
int path_buffer_len=sizeof(path_buffer);
rc = zoo_create(zk_ch, path, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, path_buffer, path_buffer_len);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
CPPUNIT_ASSERT_EQUAL(string(path), string(path_buffer));
const char* path2282 = "/zookeeper2282";
rc = zoo_acreate(zk_ch, path2282, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0,
create_completion_fn, path2282);
waitForCreateCompletion(3);
CPPUNIT_ASSERT(count == 0);
}
// Test creating normal handle via zookeeper_init then explicitly setting callback
void testLogCallbackSet()
{
watchctx_t ctx;
CPPUNIT_ASSERT(logMessages.empty());
zhandle_t *zk = createClient(&ctx);
zoo_set_log_callback(zk, &logMessageHandler);
CPPUNIT_ASSERT_EQUAL(zoo_get_log_callback(zk), &logMessageHandler);
// Log 10 messages and ensure all go to callback
const std::size_t expected = 10;
for (std::size_t i = 0; i < expected; i++)
{
LOG_INFO(LOGCALLBACK(zk), "%s #%d", __FUNCTION__, i);
}
CPPUNIT_ASSERT(expected == logMessages.size());
}
// Test creating handle via zookeeper_init2 to ensure all connection messages go to callback
void testLogCallbackInit()
{
logMessages.clear();
watchctx_t ctx;
zhandle_t *zk = createClient(&ctx, &logMessageHandler);
CPPUNIT_ASSERT_EQUAL(zoo_get_log_callback(zk), &logMessageHandler);
// All the connection messages should have gone to the callback -- don't
// want this to be a maintenance issue so we're not asserting exact count
std::size_t numBefore = logMessages.size();
CPPUNIT_ASSERT(numBefore != 0);
// Log 10 messages and ensure all go to callback
const std::size_t expected = 10;
for (std::size_t i = 0; i < expected; i++)
{
LOG_INFO(LOGCALLBACK(zk), "%s #%d", __FUNCTION__, i);
}
CPPUNIT_ASSERT(logMessages.size() == numBefore + expected);
}
// Test clearing log callback -- logging should resume going to logstream
void testLogCallbackClear()
{
logMessages.clear();
watchctx_t ctx;
zhandle_t *zk = createClient(&ctx, &logMessageHandler);
CPPUNIT_ASSERT_EQUAL(zoo_get_log_callback(zk), &logMessageHandler);
// All the connection messages should have gone to the callback -- again, we don't
// want this to be a maintenance issue so we're not asserting exact count
int numBefore = logMessages.size();
CPPUNIT_ASSERT(numBefore > 0);
// Clear log_callback
zoo_set_log_callback(zk, NULL);
// Future log messages should go to logstream not callback
LOG_INFO(LOGCALLBACK(zk), __FUNCTION__);
int numAfter = logMessages.size();
CPPUNIT_ASSERT_EQUAL(numBefore, numAfter);
}
void testAsyncWatcherAutoReset()
{
watchctx_t ctx;
zhandle_t *zk = createClient(&ctx);
watchctx_t lctx[COUNT];
int i;
char path[80];
int rc;
evt_t evt;
async_zk = zk;
for(i = 0; i < COUNT; i++) {
sprintf(path, "/awar%d", i);
rc = zoo_awexists(zk, path, watcher, &lctx[i], statCompletion, (void*)ZNONODE);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
}
yield(zk, 0);
for(i = 0; i < COUNT/4; i++) {
sprintf(path, "/awar%d", i);
rc = zoo_acreate(zk, path, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0,
stringCompletion, strdup(path));
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
}
for(i = COUNT/4; i < COUNT/2; i++) {
sprintf(path, "/awar%d", i);
rc = zoo_acreate2(zk, path, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0,
stringStatCompletion, strdup(path));
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
}
yield(zk, 3);
for(i = 0; i < COUNT/2; i++) {
sprintf(path, "/awar%d", i);
CPPUNIT_ASSERT_MESSAGE(path, waitForEvent(zk, &lctx[i], 5));
evt = lctx[i].getEvent();
CPPUNIT_ASSERT_EQUAL_MESSAGE(evt.path.c_str(), ZOO_CREATED_EVENT, evt.type);
CPPUNIT_ASSERT_EQUAL(string(path), evt.path);
}
for(i = COUNT/2 + 1; i < COUNT*10; i++) {
sprintf(path, "/awar%d", i);
rc = zoo_acreate(zk, path, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, stringCompletion, strdup(path));
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
}
yield(zk, 1);
stopServer();
CPPUNIT_ASSERT(ctx.waitForDisconnected(zk));
startServer();
CPPUNIT_ASSERT(ctx.waitForConnected(zk));
yield(zk, 3);
for(i = COUNT/2+1; i < COUNT; i++) {
sprintf(path, "/awar%d", i);
CPPUNIT_ASSERT_MESSAGE(path, waitForEvent(zk, &lctx[i], 5));
evt = lctx[i].getEvent();
CPPUNIT_ASSERT_EQUAL_MESSAGE(evt.path, ZOO_CREATED_EVENT, evt.type);
CPPUNIT_ASSERT_EQUAL(string(path), evt.path);
}
}
void testWatcherAutoReset(zhandle_t *zk, watchctx_t *ctxGlobal,
watchctx_t *ctxLocal)
{
bool isGlobal = (ctxGlobal == ctxLocal);
int rc;
struct Stat stat;
char buf[1024];
int blen;
struct String_vector strings;
const char *testName;
rc = zoo_create(zk, "/watchtest", "", 0,
&ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
rc = zoo_create(zk, "/watchtest/child", "", 0,
&ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, 0, 0);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
if (isGlobal) {
testName = "GlobalTest";
rc = zoo_get_children(zk, "/watchtest", 1, &strings);
deallocate_String_vector(&strings);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
blen = sizeof(buf);
rc = zoo_get(zk, "/watchtest/child", 1, buf, &blen, &stat);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
rc = zoo_exists(zk, "/watchtest/child2", 1, &stat);
CPPUNIT_ASSERT_EQUAL((int)ZNONODE, rc);
} else {
testName = "LocalTest";
rc = zoo_wget_children(zk, "/watchtest", watcher, ctxLocal,
&strings);
deallocate_String_vector(&strings);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
blen = sizeof(buf);
rc = zoo_wget(zk, "/watchtest/child", watcher, ctxLocal,
buf, &blen, &stat);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
rc = zoo_wexists(zk, "/watchtest/child2", watcher, ctxLocal,
&stat);
CPPUNIT_ASSERT_EQUAL((int)ZNONODE, rc);
}
CPPUNIT_ASSERT(ctxLocal->countEvents() == 0);
stopServer();
CPPUNIT_ASSERT_MESSAGE(testName, ctxGlobal->waitForDisconnected(zk));
startServer();
CPPUNIT_ASSERT_MESSAGE(testName, ctxLocal->waitForConnected(zk));
CPPUNIT_ASSERT(ctxLocal->countEvents() == 0);
rc = zoo_set(zk, "/watchtest/child", "1", 1, -1);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
struct Stat stat1, stat2;
rc = zoo_set2(zk, "/watchtest/child", "1", 1, -1, &stat1);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
CPPUNIT_ASSERT(stat1.version >= 0);
rc = zoo_set2(zk, "/watchtest/child", "1", 1, stat1.version, &stat2);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
rc = zoo_set(zk, "/watchtest/child", "1", 1, stat2.version);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
rc = zoo_create(zk, "/watchtest/child2", "", 0,
&ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
CPPUNIT_ASSERT_MESSAGE(testName, waitForEvent(zk, ctxLocal, 5));
evt_t evt = ctxLocal->getEvent();
CPPUNIT_ASSERT_EQUAL_MESSAGE(evt.path, ZOO_CHANGED_EVENT, evt.type);
CPPUNIT_ASSERT_EQUAL(string("/watchtest/child"), evt.path);
CPPUNIT_ASSERT_MESSAGE(testName, waitForEvent(zk, ctxLocal, 5));
// The create will trigget the get children and the
// exists watches
evt = ctxLocal->getEvent();
CPPUNIT_ASSERT_EQUAL_MESSAGE(evt.path, ZOO_CREATED_EVENT, evt.type);
CPPUNIT_ASSERT_EQUAL(string("/watchtest/child2"), evt.path);
CPPUNIT_ASSERT_MESSAGE(testName, waitForEvent(zk, ctxLocal, 5));
evt = ctxLocal->getEvent();
CPPUNIT_ASSERT_EQUAL_MESSAGE(evt.path, ZOO_CHILD_EVENT, evt.type);
CPPUNIT_ASSERT_EQUAL(string("/watchtest"), evt.path);
// Make sure Pings are giving us problems
sleep(5);
CPPUNIT_ASSERT(ctxLocal->countEvents() == 0);
stopServer();
CPPUNIT_ASSERT_MESSAGE(testName, ctxGlobal->waitForDisconnected(zk));
startServer();
CPPUNIT_ASSERT_MESSAGE(testName, ctxGlobal->waitForConnected(zk));
if (isGlobal) {
testName = "GlobalTest";
rc = zoo_get_children(zk, "/watchtest", 1, &strings);
deallocate_String_vector(&strings);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
blen = sizeof(buf);
rc = zoo_get(zk, "/watchtest/child", 1, buf, &blen, &stat);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
rc = zoo_exists(zk, "/watchtest/child2", 1, &stat);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
} else {
testName = "LocalTest";
rc = zoo_wget_children(zk, "/watchtest", watcher, ctxLocal,
&strings);
deallocate_String_vector(&strings);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
blen = sizeof(buf);
rc = zoo_wget(zk, "/watchtest/child", watcher, ctxLocal,
buf, &blen, &stat);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
rc = zoo_wexists(zk, "/watchtest/child2", watcher, ctxLocal,
&stat);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
}
zoo_delete(zk, "/watchtest/child2", -1);
CPPUNIT_ASSERT_MESSAGE(testName, waitForEvent(zk, ctxLocal, 5));
evt = ctxLocal->getEvent();
CPPUNIT_ASSERT_EQUAL_MESSAGE(evt.path, ZOO_DELETED_EVENT, evt.type);
CPPUNIT_ASSERT_EQUAL(string("/watchtest/child2"), evt.path);
CPPUNIT_ASSERT_MESSAGE(testName, waitForEvent(zk, ctxLocal, 5));
evt = ctxLocal->getEvent();
CPPUNIT_ASSERT_EQUAL_MESSAGE(evt.path, ZOO_CHILD_EVENT, evt.type);
CPPUNIT_ASSERT_EQUAL(string("/watchtest"), evt.path);
stopServer();
CPPUNIT_ASSERT_MESSAGE(testName, ctxGlobal->waitForDisconnected(zk));
startServer();
CPPUNIT_ASSERT_MESSAGE(testName, ctxLocal->waitForConnected(zk));
zoo_delete(zk, "/watchtest/child", -1);
zoo_delete(zk, "/watchtest", -1);
CPPUNIT_ASSERT_MESSAGE(testName, waitForEvent(zk, ctxLocal, 5));
evt = ctxLocal->getEvent();
CPPUNIT_ASSERT_EQUAL_MESSAGE(evt.path, ZOO_DELETED_EVENT, evt.type);
CPPUNIT_ASSERT_EQUAL(string("/watchtest/child"), evt.path);
// Make sure nothing is straggling
sleep(1);
CPPUNIT_ASSERT(ctxLocal->countEvents() == 0);
}
void testWatcherAutoResetWithGlobal()
{
{
watchctx_t ctx;
zhandle_t *zk = createClient(&ctx);
int rc = zoo_create(zk, "/testarwg", "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
rc = zoo_create(zk, "/testarwg/arwg", "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
}
{
watchctx_t ctx;
zhandle_t *zk = createchClient(&ctx, "127.0.0.1:22181/testarwg/arwg");
testWatcherAutoReset(zk, &ctx, &ctx);
}
}
void testWatcherAutoResetWithLocal()
{
{
watchctx_t ctx;
zhandle_t *zk = createClient(&ctx);
int rc = zoo_create(zk, "/testarwl", "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
rc = zoo_create(zk, "/testarwl/arwl", "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
}
{
watchctx_t ctx;
watchctx_t lctx;
zhandle_t *zk = createchClient(&ctx, "127.0.0.1:22181/testarwl/arwl");
testWatcherAutoReset(zk, &ctx, &lctx);
}
}
void testLastZxid() {
// ZOOKEEPER-1417: Test that c-client only update last zxid upon
// receiving request response only.
const int timeout = 5000;
int rc;
watchctx_t ctx1, ctx2;
zhandle_t *zk1 = createClient(&ctx1);
zhandle_t *zk2 = createClient(&ctx2);
CPPUNIT_ASSERT(zk1);
CPPUNIT_ASSERT(zk2);
int64_t original = zk2->last_zxid;
// Create txn to increase system zxid
rc = zoo_create(zk1, "/lastzxid", "", 0,
&ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
// This should be enough time for zk2 to receive ping request
usleep(timeout/2 * 1000);
// check that zk1's last zxid is updated
struct Stat stat;
rc = zoo_exists(zk1, "/lastzxid", 0, &stat);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
CPPUNIT_ASSERT_EQUAL((int64_t) zk1->last_zxid, stat.czxid);
// zk2's last zxid should remain the same
CPPUNIT_ASSERT_EQUAL(original, (int64_t) zk2->last_zxid);
// Perform read and also register a watch
rc = zoo_wexists(zk2, "/lastzxid", watcher, &ctx2, &stat);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
int64_t updated = zk2->last_zxid;
// check that zk2's last zxid is updated
CPPUNIT_ASSERT_EQUAL(updated, stat.czxid);
// Increment system zxid again
rc = zoo_set(zk1, "/lastzxid", NULL, -1, -1);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
// Wait for zk2 to get watch event
CPPUNIT_ASSERT(waitForEvent(zk2, &ctx2, 5));
// zk2's last zxid should remain the same
CPPUNIT_ASSERT_EQUAL(updated, (int64_t) zk2->last_zxid);
}
static void watcher_rw(zhandle_t *zh,
int type,
int state,
const char *path,
void *ctx) {
count++;
}
static void watcher_rw2(zhandle_t *zh,
int type,
int state,
const char *path,
void *ctx) {
count++;
}
void testRemoveWatchers() {
const char *path = "/something";
char buf[1024];
int blen = sizeof(buf);
int rc;
watchctx_t ctx;
zhandle_t *zk;
/* setup path */
zk = createClient(&ctx);
CPPUNIT_ASSERT(zk);
rc = zoo_create(zk, path, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
rc = zoo_create(zk, "/something2", "", 0,
&ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
/* remove all watchers */
count = 0;
rc = zoo_wget(zk, path, watcher_rw, NULL, buf, &blen, NULL);
rc = zoo_wget(zk, path, watcher_rw2, NULL, buf, &blen, NULL);
rc = zoo_remove_all_watches(zk, path, ZWATCHTYPE_ANY, 0);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
rc = zoo_set(zk, path, "nowatch", 7, -1);
CPPUNIT_ASSERT(count == 0);
/* remove a specific watcher before it's added (should fail) */
rc = zoo_remove_watches(zk, path, ZWATCHTYPE_DATA,
watcher_rw, NULL, 0);
CPPUNIT_ASSERT_EQUAL((int)ZNOWATCHER, rc);
/* now add a specific watcher and then remove it */
rc = zoo_wget(zk, path, watcher_rw, NULL,
buf, &blen, NULL);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
rc = zoo_remove_watches(zk, path, ZWATCHTYPE_DATA,
watcher_rw, NULL, 0);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
/* ditto for children watcher */
rc = zoo_remove_watches(zk, path, ZWATCHTYPE_CHILD,
watcher_rw, NULL, 0);
CPPUNIT_ASSERT_EQUAL((int)ZNOWATCHER, rc);
struct String_vector str_vec = {0, NULL};
rc = zoo_wget_children(zk, path, watcher_rw, NULL,
&str_vec);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
rc = zoo_remove_watches(zk, path, ZWATCHTYPE_CHILD,
watcher_rw, NULL, 0);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
/* add a watch, stop the server, and have remove fail */
rc = zoo_wget(zk, path, watcher_rw, NULL,
buf, &blen, NULL);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
stopServer();
ctx.waitForDisconnected(zk);
rc = zoo_remove_watches(zk, path, ZWATCHTYPE_DATA,
watcher_rw, NULL, 0);
CPPUNIT_ASSERT_EQUAL((int)ZCONNECTIONLOSS, rc);
zookeeper_close(zk);
/* bring the server back */
startServer();
zk = createClient(&ctx);
/* add a watch, stop the server, and remove it locally */
void* ctx1=(void*)0x1;
void* ctx2=(void*)0x2;
rc = zoo_wget(zk, path, watcher_rw, ctx1,
buf, &blen, NULL);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
rc = zoo_wget(zk, "/something2", watcher_rw, ctx2,
buf, &blen, NULL);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
stopServer();
rc = zoo_remove_watches(zk, path, ZWATCHTYPE_DATA,
watcher_rw, ctx1, 1);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
rc = zoo_remove_watches(zk, path, ZWATCHTYPE_DATA,
watcher_rw, ctx1, 1);
CPPUNIT_ASSERT_EQUAL((int)ZNOWATCHER, rc);
rc = zoo_remove_watches(zk, "/something2", ZWATCHTYPE_DATA,
watcher_rw, ctx2, 1);
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
}
};
volatile int Zookeeper_simpleSystem::count;
zhandle_t *Zookeeper_simpleSystem::async_zk;
const char Zookeeper_simpleSystem::hostPorts[] = "127.0.0.1:22181";
CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_simpleSystem);
#endif