blob: 8157472dd4be3a9b60b68ce5c91acf749d1ddf83 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef ZK_ADAPTOR_H_
#define ZK_ADAPTOR_H_
#include <zookeeper.jute.h>
#ifdef THREADED
#ifndef WIN32
#include <pthread.h>
#else
#include "winport.h"
#endif
#endif
#include "zookeeper.h"
#include "zk_hashtable.h"
#include "addrvec.h"
/* predefined xid's values recognized as special by the server */
#define WATCHER_EVENT_XID -1
#define PING_XID -2
#define AUTH_XID -4
#define SET_WATCHES_XID -8
/* zookeeper state constants */
#define EXPIRED_SESSION_STATE_DEF -112
#define AUTH_FAILED_STATE_DEF -113
#define CONNECTING_STATE_DEF 1
#define ASSOCIATING_STATE_DEF 2
#define CONNECTED_STATE_DEF 3
#define READONLY_STATE_DEF 5
#define SSL_CONNECTING_STATE_DEF 7
#define NOTCONNECTED_STATE_DEF 999
/* zookeeper event type constants */
#define CREATED_EVENT_DEF 1
#define DELETED_EVENT_DEF 2
#define CHANGED_EVENT_DEF 3
#define CHILD_EVENT_DEF 4
#define SESSION_EVENT_DEF -1
#define NOTWATCHING_EVENT_DEF -2
#ifdef __cplusplus
extern "C" {
#endif
struct _buffer_list;
struct _completion_list;
typedef struct _buffer_head {
struct _buffer_list *volatile head;
struct _buffer_list *last;
#ifdef THREADED
pthread_mutex_t lock;
#endif
} buffer_head_t;
typedef struct _completion_head {
struct _completion_list *volatile head;
struct _completion_list *last;
#ifdef THREADED
pthread_cond_t cond;
pthread_mutex_t lock;
#endif
} completion_head_t;
int lock_buffer_list(buffer_head_t *l);
int unlock_buffer_list(buffer_head_t *l);
int lock_completion_list(completion_head_t *l);
int unlock_completion_list(completion_head_t *l);
struct sync_completion {
int rc;
union {
struct {
char *str;
int str_len;
} str;
struct Stat stat;
struct {
char *buffer;
int buff_len;
struct Stat stat;
} data;
struct {
struct ACL_vector acl;
struct Stat stat;
} acl;
struct String_vector strs2;
struct {
struct String_vector strs2;
struct Stat stat2;
} strs_stat;
} u;
int complete;
#ifdef THREADED
pthread_cond_t cond;
pthread_mutex_t lock;
#endif
};
typedef struct _auth_info {
int state; /* 0=>inactive, >0 => active */
char* scheme;
struct buffer auth;
void_completion_t completion;
const char* data;
struct _auth_info *next;
} auth_info;
/**
* This structure represents a packet being read or written.
*/
typedef struct _buffer_list {
char *buffer;
int len; /* This represents the length of sizeof(header) + length of buffer */
int curr_offset; /* This is the offset into the header followed by offset into the buffer */
struct _buffer_list *next;
} buffer_list_t;
/* the size of connect request */
#define HANDSHAKE_REQ_SIZE 45
/* connect request */
struct connect_req {
int32_t protocolVersion;
int64_t lastZxidSeen;
int32_t timeOut;
int64_t sessionId;
int32_t passwd_len;
char passwd[16];
char readOnly;
};
/* the connect response */
struct prime_struct {
int32_t len;
int32_t protocolVersion;
int32_t timeOut;
int64_t sessionId;
int32_t passwd_len;
char passwd[16];
char readOnly;
};
#ifdef THREADED
/* this is used by mt_adaptor internally for thread management */
struct adaptor_threads {
pthread_t io;
pthread_t completion;
int threadsToWait; // barrier
pthread_cond_t cond; // barrier's conditional
pthread_mutex_t lock; // ... and a lock
pthread_mutex_t zh_lock; // critical section lock
pthread_mutex_t reconfig_lock; // lock for reconfiguring cluster's ensemble
#ifdef WIN32
SOCKET self_pipe[2];
#else
int self_pipe[2];
#endif
};
#endif
/** the auth list for adding auth */
typedef struct _auth_list_head {
auth_info *auth;
#ifdef THREADED
pthread_mutex_t lock;
#endif
} auth_list_head_t;
/**
* This structure represents the connection to zookeeper.
*/
struct _zhandle {
zsock_t *fd;
// Hostlist and list of addresses
char *hostname; // hostname contains list of zookeeper servers to connect to
struct sockaddr_storage addr_cur; // address of server we're currently connecting/connected to
struct sockaddr_storage addr_rw_server; // address of last known read/write server found.
addrvec_t addrs; // current list of addresses we're connected to
addrvec_t addrs_old; // old list of addresses that we are no longer connected to
addrvec_t addrs_new; // new list of addresses to connect to if we're reconfiguring
int reconfig; // Are we in the process of reconfiguring cluster's ensemble
double pOld, pNew; // Probability for selecting between 'addrs_old' and 'addrs_new'
int delay;
int disable_reconnection_attempt; // When set, client will not try reconnect to a different server in
// server list. This makes a sticky server for client, and is useful
// for testing if a sticky server is required, or if client wants to
// explicitly shuffle server by calling zoo_cycle_next_server.
// The default value is 0.
watcher_fn watcher; // the registered watcher
// Message timings
struct timeval last_recv; // time last message was received
struct timeval last_send; // time last message was sent
struct timeval last_ping; // time last PING was sent
struct timeval next_deadline; // time of the next deadline
int recv_timeout; // max receive timeout for messages from server
// Buffers
buffer_list_t *input_buffer; // current buffer being read in
buffer_head_t to_process; // buffers that have been read and ready to be processed
buffer_head_t to_send; // packets queued to send
completion_head_t sent_requests; // outstanding requests
completion_head_t completions_to_process; // completions that are ready to run
int outstanding_sync; // number of outstanding synchronous requests
/* read-only mode specific fields */
struct timeval last_ping_rw; /* The last time we checked server for being r/w */
int ping_rw_timeout; /* The time that can go by before checking next server */
// State info
volatile int state; // Current zookeeper state
void *context; // client-side provided context
clientid_t client_id; // client-id
long long last_zxid; // last zookeeper ID
auth_list_head_t auth_h; // authentication data list
log_callback_fn log_callback; // Callback for logging (falls back to logging to stderr)
int io_count; // counts the number of iterations of do_io
// Primer storage
struct _buffer_list primer_buffer; // The buffer used for the handshake at the start of a connection
struct prime_struct primer_storage; // the connect response
char primer_storage_buffer[41]; // the true size of primer_storage
/* zookeeper_close is not reentrant because it de-allocates the zhandler.
* This guard variable is used to defer the destruction of zhandle till
* right before top-level API call returns to the caller */
int32_t ref_counter;
volatile int close_requested;
void *adaptor_priv;
/* Used for debugging only: non-zero value indicates the time when the zookeeper_process
* call returned while there was at least one unprocessed server response
* available in the socket recv buffer */
struct timeval socket_readable;
// Watchers
zk_hashtable* active_node_watchers;
zk_hashtable* active_exist_watchers;
zk_hashtable* active_child_watchers;
/** used for chroot path at the client side **/
char *chroot;
/** Indicates if this client is allowed to go to r/o mode */
char allow_read_only;
/** Indicates if we connected to a majority server before */
char seen_rw_server_before;
};
int adaptor_init(zhandle_t *zh);
void adaptor_finish(zhandle_t *zh);
void adaptor_destroy(zhandle_t *zh);
#if THREADED
struct sync_completion *alloc_sync_completion(void);
int wait_sync_completion(struct sync_completion *sc);
void free_sync_completion(struct sync_completion *sc);
void notify_sync_completion(struct sync_completion *sc);
#endif
int adaptor_send_queue(zhandle_t *zh, int timeout);
int process_async(int outstanding_sync);
void process_completions(zhandle_t *zh);
int flush_send_queue(zhandle_t*zh, int timeout);
char* sub_string(zhandle_t *zh, const char* server_path);
void free_duplicate_path(const char* free_path, const char* path);
int zoo_lock_auth(zhandle_t *zh);
int zoo_unlock_auth(zhandle_t *zh);
// ensemble reconfigure access guards
int lock_reconfig(struct _zhandle *zh);
int unlock_reconfig(struct _zhandle *zh);
// critical section guards
int enter_critical(zhandle_t* zh);
int leave_critical(zhandle_t* zh);
// zhandle object reference counting
void api_prolog(zhandle_t* zh);
int api_epilog(zhandle_t *zh, int rc);
int32_t get_xid();
// returns the new value of the ref counter
int32_t inc_ref_counter(zhandle_t* zh,int i);
#ifdef THREADED
// atomic post-increment
int32_t fetch_and_add(volatile int32_t* operand, int incr);
// in mt mode process session event asynchronously by the completion thread
#define PROCESS_SESSION_EVENT(zh,newstate) queue_session_event(zh,newstate)
#else
// in single-threaded mode process session event immediately
//#define PROCESS_SESSION_EVENT(zh,newstate) deliverWatchers(zh,ZOO_SESSION_EVENT,newstate,0)
#define PROCESS_SESSION_EVENT(zh,newstate) queue_session_event(zh,newstate)
#endif
#ifdef __cplusplus
}
#endif
#endif /*ZK_ADAPTOR_H_*/