blob: 9d858e27dbf325aedb321e1fa10eae1db02c9be6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "zk_hashtable.h"
#include "zk_adaptor.h"
#include "hashtable/hashtable.h"
#include "hashtable/hashtable_itr.h"
#include <string.h>
#include <stdlib.h>
#include <assert.h>
typedef struct _watcher_object {
watcher_fn watcher;
void* context;
struct _watcher_object* next;
} watcher_object_t;
struct _zk_hashtable {
struct hashtable* ht;
};
struct watcher_object_list {
watcher_object_t* head;
};
/* the following functions are for testing only */
typedef struct hashtable hashtable_impl;
hashtable_impl* getImpl(zk_hashtable* ht){
return ht->ht;
}
watcher_object_t* getFirstWatcher(zk_hashtable* ht,const char* path)
{
watcher_object_list_t* wl=hashtable_search(ht->ht,(void*)path);
if(wl!=0)
return wl->head;
return 0;
}
/* end of testing functions */
watcher_object_t* clone_watcher_object(watcher_object_t* wo)
{
watcher_object_t* res=calloc(1,sizeof(watcher_object_t));
assert(res);
res->watcher=wo->watcher;
res->context=wo->context;
return res;
}
static unsigned int string_hash_djb2(void *str)
{
unsigned int hash = 5381;
int c;
const char* cstr = (const char*)str;
while ((c = *cstr++))
hash = ((hash << 5) + hash) + c; /* hash * 33 + c */
return hash;
}
static int string_equal(void *key1,void *key2)
{
return strcmp((const char*)key1,(const char*)key2)==0;
}
static watcher_object_t* create_watcher_object(watcher_fn watcher,void* ctx)
{
watcher_object_t* wo=calloc(1,sizeof(watcher_object_t));
assert(wo);
wo->watcher=watcher;
wo->context=ctx;
return wo;
}
static watcher_object_list_t* create_watcher_object_list(watcher_object_t* head)
{
watcher_object_list_t* wl=calloc(1,sizeof(watcher_object_list_t));
assert(wl);
wl->head=head;
return wl;
}
static void destroy_watcher_object_list(watcher_object_list_t* list)
{
watcher_object_t* e = NULL;
if(list==0)
return;
e=list->head;
while(e!=0){
watcher_object_t* this=e;
e=e->next;
free(this);
}
free(list);
}
zk_hashtable* create_zk_hashtable()
{
struct _zk_hashtable *ht=calloc(1,sizeof(struct _zk_hashtable));
assert(ht);
ht->ht=create_hashtable(32,string_hash_djb2,string_equal);
return ht;
}
static void do_clean_hashtable(zk_hashtable* ht)
{
struct hashtable_itr *it;
int hasMore;
if(hashtable_count(ht->ht)==0)
return;
it=hashtable_iterator(ht->ht);
do {
watcher_object_list_t* w=hashtable_iterator_value(it);
destroy_watcher_object_list(w);
hasMore=hashtable_iterator_remove(it);
} while(hasMore);
free(it);
}
void destroy_zk_hashtable(zk_hashtable* ht)
{
if(ht!=0){
do_clean_hashtable(ht);
hashtable_destroy(ht->ht,0);
free(ht);
}
}
// searches for a watcher object instance in a watcher object list;
// two watcher objects are equal if their watcher function and context pointers
// are equal
static watcher_object_t* search_watcher(watcher_object_list_t** wl,watcher_object_t* wo)
{
watcher_object_t* wobj=(*wl)->head;
while(wobj!=0){
if(wobj->watcher==wo->watcher && wobj->context==wo->context)
return wobj;
wobj=wobj->next;
}
return 0;
}
static int add_to_list(watcher_object_list_t **wl, watcher_object_t *wo,
int clone)
{
if (search_watcher(wl, wo)==0) {
watcher_object_t* cloned=wo;
if (clone) {
cloned = clone_watcher_object(wo);
assert(cloned);
}
cloned->next = (*wl)->head;
(*wl)->head = cloned;
return 1;
} else if (!clone) {
// If it's here and we aren't supposed to clone, we must destroy
free(wo);
}
return 0;
}
static int do_insert_watcher_object(zk_hashtable *ht, const char *path, watcher_object_t* wo)
{
int res=1;
watcher_object_list_t* wl;
wl=hashtable_search(ht->ht,(void*)path);
if(wl==0){
int res;
/* inserting a new path element */
res=hashtable_insert(ht->ht,strdup(path),create_watcher_object_list(wo));
assert(res);
}else{
/*
* Path already exists; check if the watcher already exists.
* Don't clone the watcher since it's allocated on the heap --- avoids
* a memory leak and saves a clone operation (calloc + copy).
*/
res = add_to_list(&wl, wo, 0);
}
return res;
}
char **collect_keys(zk_hashtable *ht, int *count)
{
char **list;
struct hashtable_itr *it;
int i;
*count = hashtable_count(ht->ht);
list = calloc(*count, sizeof(char*));
it=hashtable_iterator(ht->ht);
for(i = 0; i < *count; i++) {
list[i] = strdup(hashtable_iterator_key(it));
hashtable_iterator_advance(it);
}
free(it);
return list;
}
static int insert_watcher_object(zk_hashtable *ht, const char *path,
watcher_object_t* wo)
{
int res;
res=do_insert_watcher_object(ht,path,wo);
return res;
}
static void copy_watchers(watcher_object_list_t *from, watcher_object_list_t *to, int clone)
{
watcher_object_t* wo=from->head;
while(wo){
watcher_object_t *next = wo->next;
add_to_list(&to, wo, clone);
wo=next;
}
}
static void copy_table(zk_hashtable *from, watcher_object_list_t *to) {
struct hashtable_itr *it;
int hasMore;
if(hashtable_count(from->ht)==0)
return;
it=hashtable_iterator(from->ht);
do {
watcher_object_list_t *w = hashtable_iterator_value(it);
copy_watchers(w, to, 1);
hasMore=hashtable_iterator_advance(it);
} while(hasMore);
free(it);
}
static void collect_session_watchers(zhandle_t *zh,
watcher_object_list_t **list)
{
copy_table(zh->active_node_watchers, *list);
copy_table(zh->active_exist_watchers, *list);
copy_table(zh->active_child_watchers, *list);
}
static void add_for_event(zk_hashtable *ht, char *path, watcher_object_list_t **list)
{
watcher_object_list_t* wl;
wl = (watcher_object_list_t*)hashtable_remove(ht->ht, path);
if (wl) {
copy_watchers(wl, *list, 0);
// Since we move, not clone the watch_objects, we just need to free the
// head pointer
free(wl);
}
}
static void do_foreach_watcher(watcher_object_t* wo,zhandle_t* zh,
const char* path,int type,int state)
{
// session event's don't have paths
const char *client_path =
(type != ZOO_SESSION_EVENT ? sub_string(zh, path) : path);
while(wo!=0){
wo->watcher(zh,type,state,client_path,wo->context);
wo=wo->next;
}
free_duplicate_path(client_path, path);
}
watcher_object_list_t *collectWatchers(zhandle_t *zh,int type, char *path)
{
struct watcher_object_list *list = create_watcher_object_list(0);
if(type==ZOO_SESSION_EVENT){
watcher_object_t defWatcher;
defWatcher.watcher=zh->watcher;
defWatcher.context=zh->context;
add_to_list(&list, &defWatcher, 1);
collect_session_watchers(zh, &list);
return list;
}
switch(type){
case CREATED_EVENT_DEF:
case CHANGED_EVENT_DEF:
// look up the watchers for the path and move them to a delivery list
add_for_event(zh->active_node_watchers,path,&list);
add_for_event(zh->active_exist_watchers,path,&list);
break;
case CHILD_EVENT_DEF:
// look up the watchers for the path and move them to a delivery list
add_for_event(zh->active_child_watchers,path,&list);
break;
case DELETED_EVENT_DEF:
// look up the watchers for the path and move them to a delivery list
add_for_event(zh->active_node_watchers,path,&list);
add_for_event(zh->active_exist_watchers,path,&list);
add_for_event(zh->active_child_watchers,path,&list);
break;
}
return list;
}
void deliverWatchers(zhandle_t *zh, int type,int state, char *path, watcher_object_list_t **list)
{
if (!list || !(*list)) return;
do_foreach_watcher((*list)->head, zh, path, type, state);
destroy_watcher_object_list(*list);
*list = 0;
}
void activateWatcher(zhandle_t *zh, watcher_registration_t* reg, int rc)
{
if(reg){
/* in multithreaded lib, this code is executed
* by the IO thread */
zk_hashtable *ht = reg->checker(zh, rc);
if(ht){
insert_watcher_object(ht,reg->path,
create_watcher_object(reg->watcher, reg->context));
}
}
}
/* If watcher is NULL, we return TRUE since we consider it a match */
static int containsWatcher(zk_hashtable *watchers, const char *path,
watcher_fn watcher, void *watcherCtx)
{
watcher_object_list_t *wl;
watcher_object_t e;
if (!watcher)
return 1;
wl = hashtable_search(watchers->ht, (void *)path);
if (!wl)
return 0;
e.watcher = watcher;
e.context = watcherCtx;
return search_watcher(&wl, &e) ? 1 : 0;
}
/**
* remove any watcher_object that has a matching (watcher, watcherCtx)
*/
static void removeWatcherFromList(watcher_object_list_t *wl, watcher_fn watcher,
void *watcherCtx)
{
watcher_object_t *e = NULL;
if (!wl || (wl && !wl->head))
return;
e = wl->head;
while (e){
if (e->next &&
e->next->watcher == watcher &&
e->next->context == watcherCtx) {
watcher_object_t *this = e->next;
e->next = e->next->next;
free(this);
break;
}
e = e->next;
}
if (wl->head &&
wl->head->watcher == watcher && wl->head->context == watcherCtx) {
watcher_object_t *this = wl->head;
wl->head = wl->head->next;
free(this);
}
}
static void removeWatcher(zk_hashtable *watchers, const char *path,
watcher_fn watcher, void *watcherCtx)
{
watcher_object_list_t *wl = hashtable_search(watchers->ht, (void *)path);
if (!wl)
return;
if (!watcher) {
wl = (watcher_object_list_t *) hashtable_remove(watchers->ht,
(void *)path);
destroy_watcher_object_list(wl);
return;
}
removeWatcherFromList(wl, watcher, watcherCtx);
if (!wl->head) {
wl = (watcher_object_list_t *) hashtable_remove(watchers->ht,
(void *)path);
destroy_watcher_object_list(wl);
}
}
void deactivateWatcher(zhandle_t *zh, watcher_deregistration_t *dereg, int rc)
{
if (rc != ZOK || !dereg)
return;
removeWatchers(zh, dereg->path, dereg->type, dereg->watcher,
dereg->context);
}
void removeWatchers(zhandle_t *zh, const char* path, ZooWatcherType type,
watcher_fn watcher, void *watcherCtx)
{
switch (type) {
case ZWATCHTYPE_CHILD:
removeWatcher(zh->active_child_watchers, path, watcher, watcherCtx);
break;
case ZWATCHTYPE_DATA:
removeWatcher(zh->active_node_watchers, path, watcher, watcherCtx);
removeWatcher(zh->active_exist_watchers, path, watcher, watcherCtx);
break;
case ZWATCHTYPE_ANY:
removeWatcher(zh->active_child_watchers, path, watcher, watcherCtx);
removeWatcher(zh->active_node_watchers, path, watcher, watcherCtx);
removeWatcher(zh->active_exist_watchers, path, watcher, watcherCtx);
break;
}
}
int pathHasWatcher(zhandle_t *zh, const char *path, int wtype,
watcher_fn watcher, void *watcherCtx)
{
int watcher_found = 0;
switch (wtype) {
case ZWATCHTYPE_CHILD:
watcher_found = containsWatcher(zh->active_child_watchers,
path, watcher, watcherCtx);
break;
case ZWATCHTYPE_DATA:
watcher_found = containsWatcher(zh->active_node_watchers, path,
watcher, watcherCtx);
if (!watcher_found) {
watcher_found = containsWatcher(zh->active_exist_watchers, path,
watcher, watcherCtx);
}
break;
case ZWATCHTYPE_ANY:
watcher_found = containsWatcher(zh->active_child_watchers, path,
watcher, watcherCtx);
if (!watcher_found) {
watcher_found = containsWatcher(zh->active_node_watchers, path,
watcher, watcherCtx);
}
if (!watcher_found) {
watcher_found = containsWatcher(zh->active_exist_watchers, path,
watcher, watcherCtx);
}
break;
}
return watcher_found;
}