| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #ifndef THREADED |
| #define THREADED |
| #endif |
| |
| #if !defined(DLL_EXPORT) && !defined(USE_STATIC_LIB) |
| # define USE_STATIC_LIB |
| #endif |
| |
| #ifndef _GNU_SOURCE |
| #define _GNU_SOURCE |
| #endif |
| |
| #include "zk_adaptor.h" |
| #include "zookeeper_log.h" |
| |
| #include <stdlib.h> |
| #include <stdio.h> |
| #include <time.h> |
| #include <fcntl.h> |
| #include <assert.h> |
| #include <errno.h> |
| |
| #ifndef WIN32 |
| #include <signal.h> |
| #include <poll.h> |
| #include <unistd.h> |
| #include <sys/time.h> |
| #endif |
| |
| int zoo_lock_auth(zhandle_t *zh) |
| { |
| return pthread_mutex_lock(&zh->auth_h.lock); |
| } |
| int zoo_unlock_auth(zhandle_t *zh) |
| { |
| return pthread_mutex_unlock(&zh->auth_h.lock); |
| } |
| int lock_buffer_list(buffer_head_t *l) |
| { |
| return pthread_mutex_lock(&l->lock); |
| } |
| int unlock_buffer_list(buffer_head_t *l) |
| { |
| return pthread_mutex_unlock(&l->lock); |
| } |
| int lock_completion_list(completion_head_t *l) |
| { |
| return pthread_mutex_lock(&l->lock); |
| } |
| int unlock_completion_list(completion_head_t *l) |
| { |
| pthread_cond_broadcast(&l->cond); |
| return pthread_mutex_unlock(&l->lock); |
| } |
| struct sync_completion *alloc_sync_completion(void) |
| { |
| struct sync_completion *sc = (struct sync_completion*)calloc(1, sizeof(struct sync_completion)); |
| if (sc) { |
| pthread_cond_init(&sc->cond, 0); |
| pthread_mutex_init(&sc->lock, 0); |
| } |
| return sc; |
| } |
| int wait_sync_completion(struct sync_completion *sc) |
| { |
| pthread_mutex_lock(&sc->lock); |
| while (!sc->complete) { |
| pthread_cond_wait(&sc->cond, &sc->lock); |
| } |
| pthread_mutex_unlock(&sc->lock); |
| return 0; |
| } |
| |
| void free_sync_completion(struct sync_completion *sc) |
| { |
| if (sc) { |
| pthread_mutex_destroy(&sc->lock); |
| pthread_cond_destroy(&sc->cond); |
| free(sc); |
| } |
| } |
| |
| void notify_sync_completion(struct sync_completion *sc) |
| { |
| pthread_mutex_lock(&sc->lock); |
| sc->complete = 1; |
| pthread_cond_broadcast(&sc->cond); |
| pthread_mutex_unlock(&sc->lock); |
| } |
| |
| int process_async(int outstanding_sync) |
| { |
| return 0; |
| } |
| |
| #ifdef WIN32 |
| unsigned __stdcall do_io( void * ); |
| unsigned __stdcall do_completion( void * ); |
| |
| int handle_error(zhandle_t* zh, SOCKET sock, char* message) |
| { |
| LOG_ERROR(LOGCALLBACK(zh), "%s. %d",message, WSAGetLastError()); |
| closesocket (sock); |
| return -1; |
| } |
| |
| //--create socket pair for interupting selects. |
| int create_socket_pair(zhandle_t* zh, SOCKET fds[2]) |
| { |
| struct sockaddr_in inaddr; |
| struct sockaddr addr; |
| int yes=1; |
| int len=0; |
| |
| SOCKET lst=socket(AF_INET, SOCK_STREAM,IPPROTO_TCP); |
| if (lst == INVALID_SOCKET ){ |
| LOG_ERROR(LOGCALLBACK(zh), "Error creating socket. %d",WSAGetLastError()); |
| return -1; |
| } |
| memset(&inaddr, 0, sizeof(inaddr)); |
| memset(&addr, 0, sizeof(addr)); |
| inaddr.sin_family = AF_INET; |
| inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); |
| inaddr.sin_port = 0; //--system assigns the port |
| |
| if ( setsockopt(lst,SOL_SOCKET,SO_REUSEADDR,(char*)&yes,sizeof(yes)) == SOCKET_ERROR ) { |
| return handle_error(zh, lst,"Error trying to set socket option."); |
| } |
| if (bind(lst,(struct sockaddr *)&inaddr,sizeof(inaddr)) == SOCKET_ERROR){ |
| return handle_error(zh, lst,"Error trying to bind socket."); |
| } |
| if (listen(lst,1) == SOCKET_ERROR){ |
| return handle_error(zh, lst,"Error trying to listen on socket."); |
| } |
| len=sizeof(inaddr); |
| getsockname(lst, &addr,&len); |
| fds[0]=socket(AF_INET, SOCK_STREAM,0); |
| if (connect(fds[0],&addr,len) == SOCKET_ERROR){ |
| return handle_error(zh, lst, "Error while connecting to socket."); |
| } |
| if ((fds[1]=accept(lst,0,0)) == INVALID_SOCKET){ |
| closesocket(fds[0]); |
| return handle_error(zh, lst, "Error while accepting socket connection."); |
| } |
| closesocket(lst); |
| return 0; |
| } |
| #else |
| void *do_io(void *); |
| void *do_completion(void *); |
| #endif |
| |
| |
| int wakeup_io_thread(zhandle_t *zh); |
| |
| #ifdef WIN32 |
| static int set_nonblock(SOCKET fd){ |
| ULONG nonblocking_flag = 1; |
| if (ioctlsocket(fd, FIONBIO, &nonblocking_flag) == 0) |
| return 1; |
| else |
| return -1; |
| } |
| #else |
| static int set_nonblock(int fd){ |
| long l = fcntl(fd, F_GETFL); |
| if(l & O_NONBLOCK) return 0; |
| return fcntl(fd, F_SETFL, l | O_NONBLOCK); |
| } |
| #endif |
| |
| void wait_for_others(zhandle_t* zh) |
| { |
| struct adaptor_threads* adaptor=zh->adaptor_priv; |
| pthread_mutex_lock(&adaptor->lock); |
| while(adaptor->threadsToWait>0) |
| pthread_cond_wait(&adaptor->cond,&adaptor->lock); |
| pthread_mutex_unlock(&adaptor->lock); |
| } |
| |
| void notify_thread_ready(zhandle_t* zh) |
| { |
| struct adaptor_threads* adaptor=zh->adaptor_priv; |
| pthread_mutex_lock(&adaptor->lock); |
| adaptor->threadsToWait--; |
| pthread_cond_broadcast(&adaptor->cond); |
| while(adaptor->threadsToWait>0) |
| pthread_cond_wait(&adaptor->cond,&adaptor->lock); |
| pthread_mutex_unlock(&adaptor->lock); |
| } |
| |
| |
| void start_threads(zhandle_t* zh) |
| { |
| int rc = 0; |
| struct adaptor_threads* adaptor=zh->adaptor_priv; |
| pthread_cond_init(&adaptor->cond,0); |
| pthread_mutex_init(&adaptor->lock,0); |
| adaptor->threadsToWait=2; // wait for 2 threads before opening the barrier |
| |
| // use api_prolog() to make sure zhandle doesn't get destroyed |
| // while initialization is in progress |
| api_prolog(zh); |
| LOG_DEBUG(LOGCALLBACK(zh), "starting threads..."); |
| rc=pthread_create(&adaptor->io, 0, do_io, zh); |
| assert("pthread_create() failed for the IO thread"&&!rc); |
| rc=pthread_create(&adaptor->completion, 0, do_completion, zh); |
| assert("pthread_create() failed for the completion thread"&&!rc); |
| wait_for_others(zh); |
| api_epilog(zh, 0); |
| } |
| |
| int adaptor_init(zhandle_t *zh) |
| { |
| pthread_mutexattr_t recursive_mx_attr; |
| struct adaptor_threads *adaptor_threads = calloc(1, sizeof(*adaptor_threads)); |
| if (!adaptor_threads) { |
| LOG_ERROR(LOGCALLBACK(zh), "Out of memory"); |
| return -1; |
| } |
| |
| /* We use a pipe for interrupting select() in unix/sol and socketpair in windows. */ |
| #ifdef WIN32 |
| if (create_socket_pair(zh, adaptor_threads->self_pipe) == -1){ |
| LOG_ERROR(LOGCALLBACK(zh), "Can't make a socket."); |
| #else |
| if(pipe(adaptor_threads->self_pipe)==-1) { |
| LOG_ERROR(LOGCALLBACK(zh), "Can't make a pipe %d",errno); |
| #endif |
| free(adaptor_threads); |
| return -1; |
| } |
| set_nonblock(adaptor_threads->self_pipe[1]); |
| set_nonblock(adaptor_threads->self_pipe[0]); |
| |
| pthread_mutex_init(&zh->auth_h.lock,0); |
| |
| zh->adaptor_priv = adaptor_threads; |
| pthread_mutex_init(&zh->to_process.lock,0); |
| pthread_mutex_init(&adaptor_threads->zh_lock,0); |
| pthread_mutex_init(&adaptor_threads->reconfig_lock,0); |
| // to_send must be recursive mutex |
| pthread_mutexattr_init(&recursive_mx_attr); |
| pthread_mutexattr_settype(&recursive_mx_attr, PTHREAD_MUTEX_RECURSIVE); |
| pthread_mutex_init(&zh->to_send.lock,&recursive_mx_attr); |
| pthread_mutexattr_destroy(&recursive_mx_attr); |
| |
| pthread_mutex_init(&zh->sent_requests.lock,0); |
| pthread_cond_init(&zh->sent_requests.cond,0); |
| pthread_mutex_init(&zh->completions_to_process.lock,0); |
| pthread_cond_init(&zh->completions_to_process.cond,0); |
| start_threads(zh); |
| return 0; |
| } |
| |
| void adaptor_finish(zhandle_t *zh) |
| { |
| struct adaptor_threads *adaptor_threads; |
| // make sure zh doesn't get destroyed until after we're done here |
| api_prolog(zh); |
| adaptor_threads = zh->adaptor_priv; |
| if(adaptor_threads==0) { |
| api_epilog(zh,0); |
| return; |
| } |
| |
| if(!pthread_equal(adaptor_threads->io,pthread_self())){ |
| wakeup_io_thread(zh); |
| pthread_join(adaptor_threads->io, 0); |
| }else |
| pthread_detach(adaptor_threads->io); |
| |
| if(!pthread_equal(adaptor_threads->completion,pthread_self())){ |
| pthread_mutex_lock(&zh->completions_to_process.lock); |
| pthread_cond_broadcast(&zh->completions_to_process.cond); |
| pthread_mutex_unlock(&zh->completions_to_process.lock); |
| pthread_join(adaptor_threads->completion, 0); |
| }else |
| pthread_detach(adaptor_threads->completion); |
| |
| api_epilog(zh,0); |
| } |
| |
| void adaptor_destroy(zhandle_t *zh) |
| { |
| struct adaptor_threads *adaptor = zh->adaptor_priv; |
| if(adaptor==0) return; |
| |
| pthread_cond_destroy(&adaptor->cond); |
| pthread_mutex_destroy(&adaptor->lock); |
| pthread_mutex_destroy(&zh->to_process.lock); |
| pthread_mutex_destroy(&zh->to_send.lock); |
| pthread_mutex_destroy(&zh->sent_requests.lock); |
| pthread_cond_destroy(&zh->sent_requests.cond); |
| pthread_mutex_destroy(&zh->completions_to_process.lock); |
| pthread_cond_destroy(&zh->completions_to_process.cond); |
| pthread_mutex_destroy(&adaptor->zh_lock); |
| |
| pthread_mutex_destroy(&zh->auth_h.lock); |
| |
| close(adaptor->self_pipe[0]); |
| close(adaptor->self_pipe[1]); |
| free(adaptor); |
| zh->adaptor_priv=0; |
| } |
| |
| int wakeup_io_thread(zhandle_t *zh) |
| { |
| struct adaptor_threads *adaptor_threads = zh->adaptor_priv; |
| char c=0; |
| #ifndef WIN32 |
| return write(adaptor_threads->self_pipe[1],&c,1)==1? ZOK: ZSYSTEMERROR; |
| #else |
| return send(adaptor_threads->self_pipe[1], &c, 1, 0)==1? ZOK: ZSYSTEMERROR; |
| #endif |
| } |
| |
| int adaptor_send_queue(zhandle_t *zh, int timeout) |
| { |
| if(!zh->close_requested) |
| return wakeup_io_thread(zh); |
| // don't rely on the IO thread to send the messages if the app has |
| // requested to close |
| return flush_send_queue(zh, timeout); |
| } |
| |
| /* These two are declared here because we will run the event loop |
| * and not the client */ |
| #ifdef WIN32 |
| int zookeeper_interest(zhandle_t *zh, SOCKET *fd, int *interest, |
| struct timeval *tv); |
| #else |
| int zookeeper_interest(zhandle_t *zh, int *fd, int *interest, |
| struct timeval *tv); |
| #endif |
| int zookeeper_process(zhandle_t *zh, int events); |
| |
| #ifdef WIN32 |
| unsigned __stdcall do_io( void * v) |
| #else |
| void *do_io(void *v) |
| #endif |
| { |
| zhandle_t *zh = (zhandle_t*)v; |
| #ifndef WIN32 |
| struct pollfd fds[2]; |
| struct adaptor_threads *adaptor_threads = zh->adaptor_priv; |
| |
| api_prolog(zh); |
| notify_thread_ready(zh); |
| LOG_DEBUG(LOGCALLBACK(zh), "started IO thread"); |
| fds[0].fd=adaptor_threads->self_pipe[0]; |
| fds[0].events=POLLIN; |
| while(!zh->close_requested) { |
| zh->io_count++; |
| struct timeval tv; |
| int fd; |
| int interest; |
| int timeout; |
| int maxfd=1; |
| |
| zookeeper_interest(zh, &fd, &interest, &tv); |
| if (fd != -1) { |
| fds[1].fd=fd; |
| fds[1].events=(interest&ZOOKEEPER_READ)?POLLIN:0; |
| fds[1].events|=(interest&ZOOKEEPER_WRITE)?POLLOUT:0; |
| maxfd=2; |
| } |
| timeout=tv.tv_sec * 1000 + (tv.tv_usec/1000); |
| |
| poll(fds,maxfd,timeout); |
| if (fd != -1) { |
| interest=(fds[1].revents&POLLIN)?ZOOKEEPER_READ:0; |
| interest|=((fds[1].revents&POLLOUT)||(fds[1].revents&POLLHUP))?ZOOKEEPER_WRITE:0; |
| } |
| if(fds[0].revents&POLLIN){ |
| // flush the pipe |
| char b[128]; |
| while(read(adaptor_threads->self_pipe[0],b,sizeof(b))==sizeof(b)){} |
| } |
| #else |
| fd_set rfds, wfds; |
| struct adaptor_threads *adaptor_threads = zh->adaptor_priv; |
| api_prolog(zh); |
| notify_thread_ready(zh); |
| LOG_DEBUG(LOGCALLBACK(zh), "started IO thread"); |
| |
| while(!zh->close_requested) { |
| struct timeval tv; |
| SOCKET fd; |
| int interest; |
| int rc; |
| |
| zookeeper_interest(zh, &fd, &interest, &tv); |
| |
| // FD_ZERO is cheap on Win32, it just sets count of elements to zero. |
| // It needs to be done to ensure no stale entries. |
| FD_ZERO(&rfds); |
| FD_ZERO(&wfds); |
| |
| if (fd != -1) { |
| if (interest&ZOOKEEPER_READ) { |
| FD_SET(fd, &rfds); |
| } |
| |
| if (interest&ZOOKEEPER_WRITE) { |
| FD_SET(fd, &wfds); |
| } |
| } |
| |
| // Always interested in self_pipe. |
| FD_SET(adaptor_threads->self_pipe[0], &rfds); |
| |
| rc = select(/* unused */0, &rfds, &wfds, NULL, &tv); |
| if (rc > 0) { |
| interest=(FD_ISSET(fd, &rfds))? ZOOKEEPER_READ: 0; |
| interest|=(FD_ISSET(fd, &wfds))? ZOOKEEPER_WRITE: 0; |
| |
| if (FD_ISSET(adaptor_threads->self_pipe[0], &rfds)){ |
| // flush the pipe/socket |
| char b[128]; |
| while(recv(adaptor_threads->self_pipe[0],b,sizeof(b), 0)==sizeof(b)){} |
| } |
| } |
| else if (rc < 0) { |
| LOG_ERROR(LOGCALLBACK(zh), ("select() failed %d [%d].", rc, WSAGetLastError())); |
| |
| // Clear interest events for zookeeper_process if select() fails. |
| interest = 0; |
| } |
| |
| #endif |
| // dispatch zookeeper events |
| zookeeper_process(zh, interest); |
| // check the current state of the zhandle and terminate |
| // if it is_unrecoverable() |
| if(is_unrecoverable(zh)) |
| break; |
| } |
| api_epilog(zh, 0); |
| LOG_DEBUG(LOGCALLBACK(zh), "IO thread terminated"); |
| return 0; |
| } |
| |
| #ifdef WIN32 |
| unsigned __stdcall do_completion( void * v) |
| #else |
| void *do_completion(void *v) |
| #endif |
| { |
| zhandle_t *zh = v; |
| api_prolog(zh); |
| notify_thread_ready(zh); |
| LOG_DEBUG(LOGCALLBACK(zh), "started completion thread"); |
| while(!zh->close_requested) { |
| pthread_mutex_lock(&zh->completions_to_process.lock); |
| while(!zh->completions_to_process.head && !zh->close_requested) { |
| pthread_cond_wait(&zh->completions_to_process.cond, &zh->completions_to_process.lock); |
| } |
| pthread_mutex_unlock(&zh->completions_to_process.lock); |
| process_completions(zh); |
| } |
| api_epilog(zh, 0); |
| LOG_DEBUG(LOGCALLBACK(zh), "completion thread terminated"); |
| return 0; |
| } |
| |
| int32_t inc_ref_counter(zhandle_t* zh,int i) |
| { |
| int incr=(i<0?-1:(i>0?1:0)); |
| // fetch_and_add implements atomic post-increment |
| int v=fetch_and_add(&zh->ref_counter,incr); |
| // inc_ref_counter wants pre-increment |
| v+=incr; // simulate pre-increment |
| return v; |
| } |
| |
| int32_t fetch_and_add(volatile int32_t* operand, int incr) |
| { |
| #ifndef WIN32 |
| return __sync_fetch_and_add(operand, incr); |
| #else |
| return InterlockedExchangeAdd(operand, incr); |
| #endif |
| } |
| |
| // make sure the static xid is initialized before any threads started |
| __attribute__((constructor)) int32_t get_xid() |
| { |
| static int32_t xid = -1; |
| if (xid == -1) { |
| xid = time(0); |
| } |
| return fetch_and_add(&xid,1); |
| } |
| |
| int lock_reconfig(struct _zhandle *zh) |
| { |
| struct adaptor_threads *adaptor = zh->adaptor_priv; |
| if (adaptor) { |
| return pthread_mutex_lock(&adaptor->reconfig_lock); |
| } else { |
| return 0; |
| } |
| } |
| int unlock_reconfig(struct _zhandle *zh) |
| { |
| struct adaptor_threads *adaptor = zh->adaptor_priv; |
| if (adaptor) { |
| return pthread_mutex_unlock(&adaptor->reconfig_lock); |
| } else { |
| return 0; |
| } |
| } |
| |
| int enter_critical(zhandle_t* zh) |
| { |
| struct adaptor_threads *adaptor = zh->adaptor_priv; |
| if (adaptor) { |
| return pthread_mutex_lock(&adaptor->zh_lock); |
| } else { |
| return 0; |
| } |
| } |
| |
| int leave_critical(zhandle_t* zh) |
| { |
| struct adaptor_threads *adaptor = zh->adaptor_priv; |
| if (adaptor) { |
| return pthread_mutex_unlock(&adaptor->zh_lock); |
| } else { |
| return 0; |
| } |
| } |