blob: e98178dc146d196869a965d473e1043aae707724 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <zookeeper.h>
#include <proto.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/select.h>
#include <sys/time.h>
#include <time.h>
#include <errno.h>
#include <assert.h>
#ifdef YCA
#include <yca/yca.h>
#endif
#define _LL_CAST_ (long long)
static zhandle_t *zh;
static clientid_t myid;
static const char *clientIdFile = 0;
struct timeval startTime;
static char cmd[1024];
static int batchMode=0;
static int to_send=0;
static int sent=0;
static int recvd=0;
static int shutdownThisThing=0;
static __attribute__ ((unused)) void
printProfileInfo(struct timeval start, struct timeval end, int thres,
const char* msg)
{
int delay=(end.tv_sec*1000+end.tv_usec/1000)-
(start.tv_sec*1000+start.tv_usec/1000);
if(delay>thres)
fprintf(stderr,"%s: execution time=%dms\n",msg,delay);
}
static const char* state2String(int state){
if (state == 0)
return "CLOSED_STATE";
if (state == ZOO_CONNECTING_STATE)
return "CONNECTING_STATE";
if (state == ZOO_ASSOCIATING_STATE)
return "ASSOCIATING_STATE";
if (state == ZOO_CONNECTED_STATE)
return "CONNECTED_STATE";
if (state == ZOO_EXPIRED_SESSION_STATE)
return "EXPIRED_SESSION_STATE";
if (state == ZOO_AUTH_FAILED_STATE)
return "AUTH_FAILED_STATE";
return "INVALID_STATE";
}
void watcher(zhandle_t *zzh, int type, int state, const char *path,
void* context)
{
/* Be careful using zh here rather than zzh - as this may be mt code
* the client lib may call the watcher before zookeeper_init returns */
fprintf(stderr, "Watcher %d state = %s", type, state2String(state));
if (path && strlen(path) > 0) {
fprintf(stderr, " for path %s", path);
}
fprintf(stderr, "\n");
if (type == ZOO_SESSION_EVENT) {
if (state == ZOO_CONNECTED_STATE) {
const clientid_t *id = zoo_client_id(zzh);
if (myid.client_id == 0 || myid.client_id != id->client_id) {
myid = *id;
fprintf(stderr, "Got a new session id: 0x%llx\n",
_LL_CAST_ myid.client_id);
if (clientIdFile) {
FILE *fh = fopen(clientIdFile, "w");
if (!fh) {
perror(clientIdFile);
} else {
int rc = fwrite(&myid, sizeof(myid), 1, fh);
if (rc != sizeof(myid)) {
perror("writing client id");
}
fclose(fh);
}
}
}
} else if (state == ZOO_AUTH_FAILED_STATE) {
fprintf(stderr, "Authentication failure. Shutting down...\n");
zookeeper_close(zzh);
shutdownThisThing=1;
zh=0;
} else if (state == ZOO_EXPIRED_SESSION_STATE) {
fprintf(stderr, "Session expired. Shutting down...\n");
zookeeper_close(zzh);
shutdownThisThing=1;
zh=0;
}
}
}
void dumpStat(const struct Stat *stat) {
char tctimes[40];
char tmtimes[40];
time_t tctime;
time_t tmtime;
if (!stat) {
fprintf(stderr,"null\n");
return;
}
tctime = stat->ctime/1000;
tmtime = stat->mtime/1000;
fprintf(stderr, "\tctime = %s\tczxid=%llx\n"
"\tmtime=%s\tmzxid=%llx\n"
"\tversion=%x\taversion=%x\n"
"\tephemeralOwner = %llx\n",
ctime_r(&tctime, tctimes), _LL_CAST_ stat->czxid, ctime_r(&tmtime, tmtimes),
_LL_CAST_ stat->mzxid,
(unsigned int)stat->version, (unsigned int)stat->aversion,
_LL_CAST_ stat->ephemeralOwner);
}
void my_string_completion(int rc, const char *name, const void *data) {
fprintf(stderr, "[%s]: rc = %d\n", (char*)(data==0?"null":data), rc);
if (!rc) {
fprintf(stderr, "\tname = %s\n", name);
}
if(batchMode)
shutdownThisThing=1;
}
void my_data_completion(int rc, const char *value, int value_len,
const struct Stat *stat, const void *data) {
struct timeval tv;
int sec;
int usec;
gettimeofday(&tv, 0);
sec = tv.tv_sec - startTime.tv_sec;
usec = tv.tv_usec - startTime.tv_usec;
fprintf(stderr, "time = %d msec\n", sec*1000 + usec/1000);
fprintf(stderr, "%s: rc = %d\n", (char*)data, rc);
if (value) {
fprintf(stderr, " value_len = %d\n", value_len);
assert(write(2, value, value_len) == value_len);
}
fprintf(stderr, "\nStat:\n");
dumpStat(stat);
free((void*)data);
if(batchMode)
shutdownThisThing=1;
}
void my_silent_data_completion(int rc, const char *value, int value_len,
const struct Stat *stat, const void *data) {
recvd++;
fprintf(stderr, "Data completion %s rc = %d\n",(char*)data,rc);
free((void*)data);
if (recvd==to_send) {
fprintf(stderr,"Recvd %d responses for %d requests sent\n",recvd,to_send);
if(batchMode)
shutdownThisThing=1;
}
}
void my_strings_completion(int rc, const struct String_vector *strings,
const void *data) {
struct timeval tv;
int sec;
int usec;
int i;
gettimeofday(&tv, 0);
sec = tv.tv_sec - startTime.tv_sec;
usec = tv.tv_usec - startTime.tv_usec;
fprintf(stderr, "time = %d msec\n", sec*1000 + usec/1000);
fprintf(stderr, "%s: rc = %d\n", (char*)data, rc);
if (strings)
for (i=0; i < strings->count; i++) {
fprintf(stderr, "\t%s\n", strings->data[i]);
}
free((void*)data);
gettimeofday(&tv, 0);
sec = tv.tv_sec - startTime.tv_sec;
usec = tv.tv_usec - startTime.tv_usec;
fprintf(stderr, "time = %d msec\n", sec*1000 + usec/1000);
if(batchMode)
shutdownThisThing=1;
}
void my_strings_stat_completion(int rc, const struct String_vector *strings,
const struct Stat *stat, const void *data) {
my_strings_completion(rc, strings, data);
dumpStat(stat);
if(batchMode)
shutdownThisThing=1;
}
void my_void_completion(int rc, const void *data) {
fprintf(stderr, "%s: rc = %d\n", (char*)data, rc);
free((void*)data);
if(batchMode)
shutdownThisThing=1;
}
void my_stat_completion(int rc, const struct Stat *stat, const void *data) {
fprintf(stderr, "%s: rc = %d Stat:\n", (char*)data, rc);
dumpStat(stat);
free((void*)data);
if(batchMode)
shutdownThisThing=1;
}
void my_silent_stat_completion(int rc, const struct Stat *stat,
const void *data) {
// fprintf(stderr, "State completion: [%s] rc = %d\n", (char*)data, rc);
sent++;
free((void*)data);
}
static void sendRequest(const char* data) {
zoo_aset(zh, "/od", data, strlen(data), -1, my_silent_stat_completion,
strdup("/od"));
zoo_aget(zh, "/od", 1, my_silent_data_completion, strdup("/od"));
}
void od_completion(int rc, const struct Stat *stat, const void *data) {
int i;
fprintf(stderr, "od command response: rc = %d Stat:\n", rc);
dumpStat(stat);
// send a whole bunch of requests
recvd=0;
sent=0;
to_send=200;
for (i=0; i<to_send; i++) {
char buf[4096*16];
memset(buf, -1, sizeof(buf)-1);
buf[sizeof(buf)-1]=0;
sendRequest(buf);
}
}
int startsWith(const char *line, const char *prefix) {
int len = strlen(prefix);
return strncmp(line, prefix, len) == 0;
}
static const char *hostPort;
static int verbose = 0;
void processline(char *line) {
int rc;
int async = ((line[0] == 'a') && !(startsWith(line, "addauth ")));
if (async) {
line++;
}
if (startsWith(line, "help")) {
fprintf(stderr, " create [+[e|s]] <path>\n");
fprintf(stderr, " delete <path>\n");
fprintf(stderr, " set <path> <data>\n");
fprintf(stderr, " get <path>\n");
fprintf(stderr, " ls <path>\n");
fprintf(stderr, " ls2 <path>\n");
fprintf(stderr, " sync <path>\n");
fprintf(stderr, " exists <path>\n");
fprintf(stderr, " myid\n");
fprintf(stderr, " verbose\n");
fprintf(stderr, " addauth <id> <scheme>\n");
fprintf(stderr, " quit\n");
fprintf(stderr, "\n");
fprintf(stderr, " prefix the command with the character 'a' to run the command asynchronously.\n");
fprintf(stderr, " run the 'verbose' command to toggle verbose logging.\n");
fprintf(stderr, " i.e. 'aget /foo' to get /foo asynchronously\n");
} else if (startsWith(line, "verbose")) {
if (verbose) {
verbose = 0;
zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
fprintf(stderr, "logging level set to WARN\n");
} else {
verbose = 1;
zoo_set_debug_level(ZOO_LOG_LEVEL_DEBUG);
fprintf(stderr, "logging level set to DEBUG\n");
}
} else if (startsWith(line, "get ")) {
line += 4;
if (line[0] != '/') {
fprintf(stderr, "Path must start with /, found: %s\n", line);
return;
}
gettimeofday(&startTime, 0);
rc = zoo_aget(zh, line, 1, my_data_completion, strdup(line));
if (rc) {
fprintf(stderr, "Error %d for %s\n", rc, line);
}
} else if (startsWith(line, "set ")) {
char *ptr;
line += 4;
if (line[0] != '/') {
fprintf(stderr, "Path must start with /, found: %s\n", line);
return;
}
ptr = strchr(line, ' ');
if (!ptr) {
fprintf(stderr, "No data found after path\n");
return;
}
*ptr = '\0';
ptr++;
if (async) {
rc = zoo_aset(zh, line, ptr, strlen(ptr), -1, my_stat_completion,
strdup(line));
} else {
struct Stat stat;
rc = zoo_set2(zh, line, ptr, strlen(ptr), -1, &stat);
}
if (rc) {
fprintf(stderr, "Error %d for %s\n", rc, line);
}
} else if (startsWith(line, "ls ")) {
line += 3;
if (line[0] != '/') {
fprintf(stderr, "Path must start with /, found: %s\n", line);
return;
}
gettimeofday(&startTime, 0);
rc= zoo_aget_children(zh, line, 1, my_strings_completion, strdup(line));
if (rc) {
fprintf(stderr, "Error %d for %s\n", rc, line);
}
} else if (startsWith(line, "ls2 ")) {
line += 4;
if (line[0] != '/') {
fprintf(stderr, "Path must start with /, found: %s\n", line);
return;
}
gettimeofday(&startTime, 0);
rc= zoo_aget_children2(zh, line, 1, my_strings_stat_completion, strdup(line));
if (rc) {
fprintf(stderr, "Error %d for %s\n", rc, line);
}
} else if (startsWith(line, "create ")) {
int flags = 0;
line += 7;
if (line[0] == '+') {
line++;
if (line[0] == 'e') {
flags |= ZOO_EPHEMERAL;
line++;
}
if (line[0] == 's') {
flags |= ZOO_SEQUENCE;
line++;
}
line++;
}
if (line[0] != '/') {
fprintf(stderr, "Path must start with /, found: %s\n", line);
return;
}
fprintf(stderr, "Creating [%s] node\n", line);
// {
// struct ACL _CREATE_ONLY_ACL_ACL[] = {{ZOO_PERM_CREATE, ZOO_ANYONE_ID_UNSAFE}};
// struct ACL_vector CREATE_ONLY_ACL = {1,_CREATE_ONLY_ACL_ACL};
// rc = zoo_acreate(zh, line, "new", 3, &CREATE_ONLY_ACL, flags,
// my_string_completion, strdup(line));
// }
rc = zoo_acreate(zh, line, "new", 3, &ZOO_OPEN_ACL_UNSAFE, flags,
my_string_completion, strdup(line));
if (rc) {
fprintf(stderr, "Error %d for %s\n", rc, line);
}
} else if (startsWith(line, "delete ")) {
line += 7;
if (line[0] != '/') {
fprintf(stderr, "Path must start with /, found: %s\n", line);
return;
}
if (async) {
rc = zoo_adelete(zh, line, -1, my_void_completion, strdup(line));
} else {
rc = zoo_delete(zh, line, -1);
}
if (rc) {
fprintf(stderr, "Error %d for %s\n", rc, line);
}
} else if (startsWith(line, "sync ")) {
line += 5;
if (line[0] != '/') {
fprintf(stderr, "Path must start with /, found: %s\n", line);
return;
}
rc = zoo_async(zh, line, my_string_completion, strdup(line));
if (rc) {
fprintf(stderr, "Error %d for %s\n", rc, line);
}
} else if (startsWith(line, "exists ")) {
#ifdef THREADED
struct Stat stat;
#endif
line += 7;
if (line[0] != '/') {
fprintf(stderr, "Path must start with /, found: %s\n", line);
return;
}
#ifndef THREADED
rc = zoo_aexists(zh, line, 1, my_stat_completion, strdup(line));
#else
rc = zoo_exists(zh, line, 1, &stat);
#endif
if (rc) {
fprintf(stderr, "Error %d for %s\n", rc, line);
}
} else if (strcmp(line, "myid") == 0) {
printf("session Id = %llx\n", _LL_CAST_ zoo_client_id(zh)->client_id);
} else if (strcmp(line, "reinit") == 0) {
zookeeper_close(zh);
// we can't send myid to the server here -- zookeeper_close() removes
// the session on the server. We must start anew.
zh = zookeeper_init(hostPort, watcher, 30000, 0, 0, 0);
} else if (startsWith(line, "quit")) {
fprintf(stderr, "Quitting...\n");
shutdownThisThing=1;
} else if (startsWith(line, "od")) {
const char val[]="fire off";
fprintf(stderr, "Overdosing...\n");
rc = zoo_aset(zh, "/od", val, sizeof(val)-1, -1, od_completion, 0);
if (rc)
fprintf(stderr, "od command failed: %d\n", rc);
} else if (startsWith(line, "addauth ")) {
char *ptr;
line += 8;
ptr = strchr(line, ' ');
if (ptr) {
*ptr = '\0';
ptr++;
}
zoo_add_auth(zh, line, ptr, ptr ? strlen(ptr) : 0, NULL, NULL);
}
}
int main(int argc, char **argv) {
#ifndef THREADED
fd_set rfds, wfds, efds;
int processed=0;
#endif
char buffer[4096];
char p[2048];
#ifdef YCA
char *cert=0;
char appId[64];
#endif
int bufoff = 0;
FILE *fh;
if (argc < 2) {
fprintf(stderr,
"USAGE %s zookeeper_host_list [clientid_file|cmd:(ls|ls2|create|od|...)]\n",
argv[0]);
fprintf(stderr,
"Version: ZooKeeper cli (c client) version %d.%d.%d\n",
ZOO_MAJOR_VERSION,
ZOO_MINOR_VERSION,
ZOO_PATCH_VERSION);
return 2;
}
if (argc > 2) {
if(strncmp("cmd:",argv[2],4)==0){
strcpy(cmd,argv[2]+4);
batchMode=1;
fprintf(stderr,"Batch mode: %s\n",cmd);
}else{
clientIdFile = argv[2];
fh = fopen(clientIdFile, "r");
if (fh) {
if (fread(&myid, sizeof(myid), 1, fh) != sizeof(myid)) {
memset(&myid, 0, sizeof(myid));
}
fclose(fh);
}
}
}
#ifdef YCA
strcpy(appId,"yahoo.example.yca_test");
cert = yca_get_cert_once(appId);
if(cert!=0) {
fprintf(stderr,"Certificate for appid [%s] is [%s]\n",appId,cert);
strncpy(p,cert,sizeof(p)-1);
free(cert);
} else {
fprintf(stderr,"Certificate for appid [%s] not found\n",appId);
strcpy(p,"dummy");
}
#else
strcpy(p, "dummy");
#endif
verbose = 0;
zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
zoo_deterministic_conn_order(1); // enable deterministic order
hostPort = argv[1];
zh = zookeeper_init(hostPort, watcher, 30000, &myid, 0, 0);
if (!zh) {
return errno;
}
#ifdef YCA
if(zoo_add_auth(zh,"yca",p,strlen(p),0,0)!=ZOK)
return 2;
#endif
#ifdef THREADED
while(!shutdownThisThing) {
int rc;
int len = sizeof(buffer) - bufoff -1;
if (len <= 0) {
fprintf(stderr, "Can't handle lines that long!\n");
exit(2);
}
rc = read(0, buffer+bufoff, len);
if (rc <= 0) {
fprintf(stderr, "bye\n");
shutdownThisThing=1;
break;
}
bufoff += rc;
buffer[bufoff] = '\0';
while (strchr(buffer, '\n')) {
char *ptr = strchr(buffer, '\n');
*ptr = '\0';
processline(buffer);
ptr++;
memmove(buffer, ptr, strlen(ptr)+1);
bufoff = 0;
}
}
#else
FD_ZERO(&rfds);
FD_ZERO(&wfds);
FD_ZERO(&efds);
while (!shutdownThisThing) {
int fd;
int interest;
int events;
struct timeval tv;
int rc;
zookeeper_interest(zh, &fd, &interest, &tv);
if (fd != -1) {
if (interest&ZOOKEEPER_READ) {
FD_SET(fd, &rfds);
} else {
FD_CLR(fd, &rfds);
}
if (interest&ZOOKEEPER_WRITE) {
FD_SET(fd, &wfds);
} else {
FD_CLR(fd, &wfds);
}
} else {
fd = 0;
}
FD_SET(0, &rfds);
rc = select(fd+1, &rfds, &wfds, &efds, &tv);
events = 0;
if (rc > 0) {
if (FD_ISSET(fd, &rfds)) {
events |= ZOOKEEPER_READ;
}
if (FD_ISSET(fd, &wfds)) {
events |= ZOOKEEPER_WRITE;
}
}
if(batchMode && processed==0){
//batch mode
processline(cmd);
processed=1;
}
if (FD_ISSET(0, &rfds)) {
int rc;
int len = sizeof(buffer) - bufoff -1;
if (len <= 0) {
fprintf(stderr, "Can't handle lines that long!\n");
exit(2);
}
rc = read(0, buffer+bufoff, len);
if (rc <= 0) {
fprintf(stderr, "bye\n");
break;
}
bufoff += rc;
buffer[bufoff] = '\0';
while (strchr(buffer, '\n')) {
char *ptr = strchr(buffer, '\n');
*ptr = '\0';
processline(buffer);
ptr++;
memmove(buffer, ptr, strlen(ptr)+1);
bufoff = 0;
}
}
zookeeper_process(zh, events);
}
#endif
if (to_send!=0)
fprintf(stderr,"Recvd %d responses for %d requests sent\n",recvd,sent);
zookeeper_close(zh);
return 0;
}