| /////////////////////////////////////////////////////////////////////////////// |
| // |
| // @@@ START COPYRIGHT @@@ |
| // |
| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| // @@@ END COPYRIGHT @@@ |
| // |
| /////////////////////////////////////////////////////////////////////////////// |
| |
| #include <iostream> |
| #include <map> |
| #include <string> |
| #include <vector> |
| |
| using namespace std; |
| |
| #include <stdlib.h> |
| #include <errno.h> |
| #include <fcntl.h> |
| #include <stdio.h> |
| #include <string.h> |
| #include <signal.h> |
| #include <unistd.h> |
| |
| #include <arpa/inet.h> |
| #include <linux/unistd.h> |
| #include <sys/ipc.h> |
| #include <sys/msg.h> |
| #include <sys/time.h> |
| #include <sys/types.h> |
| #include <sys/stat.h> |
| #include <sys/shm.h> |
| |
| #include "clio.h" |
| #include "localio.h" |
| #include "common/evl_sqlog_eventnum.h" |
| #include "sqevlog/evl_sqlog_writer.h" |
| |
| #ifdef NDEBUG |
| # ifdef USE_ASSERT_ABORT |
| # define LIOTM_assert(exp) (void)((exp)||abort()) |
| # else |
| # define LIOTM_assert(exp) ((void)0) |
| # endif |
| #else |
| # define LIOTM_assert(exp) (void)((exp)||(LIOTM_assert_fun(#exp, __FILE__, __LINE__, __PRETTY_FUNCTION__), 0)) |
| #endif // NDEBUG |
| |
| #define gettid() syscall(__NR_gettid) |
| |
| #define ENDL endl; fflush(stdout) |
| #define SUCCESS 0 |
| #define FAILURE -1 |
| |
| bool Shutdown = false; |
| bool Local_IO_To_Monitor::cv_trace = false; |
| void (*Local_IO_To_Monitor::cp_trace_cb)(const char *, const char *, va_list) = NULL; |
| |
| enum { ALTSIG_NODES = 1000 }; |
| typedef struct altsig_node { |
| struct altsig_node *ip_next; |
| int iv_sig; |
| siginfo_t iv_siginfo; |
| } Altsig_Node; |
| Altsig_Node *gp_altsig_free_head = NULL; |
| Altsig_Node *gp_altsig_free_tail = NULL; |
| Altsig_Node *gp_altsig_queue_head = NULL; |
| Altsig_Node *gp_altsig_queue_tail = NULL; |
| Altsig_Node **gpp_altsig_nodes = NULL; |
| pthread_cond_t gv_altsig_cond = PTHREAD_COND_INITIALIZER; |
| pthread_mutex_t gv_altsig_mutex = PTHREAD_MUTEX_INITIALIZER; |
| pthread_spinlock_t gv_altsig_spinlock; |
| |
| // the global local io object. When this is set, the code in msmon.cpp |
| // and the shell.cxx follow the localio code path. |
| Local_IO_To_Monitor *gp_local_mon_io = NULL; |
| |
| void LIOTM_assert_fun(const char *pp_exp, |
| const char *pp_file, |
| unsigned pv_line, |
| const char *pp_fun) { |
| char la_buf[BUFSIZ]; |
| char la_cmdline[BUFSIZ]; |
| |
| strcpy(la_buf, "/proc/self/cmdline"); |
| FILE *lp_file = fopen(la_buf, "r"); |
| char *lp_s; |
| if (lp_file != NULL) { |
| lp_s = fgets(la_cmdline, sizeof(la_cmdline), lp_file); |
| fclose(lp_file); |
| } else |
| lp_s = NULL; |
| if (lp_s == NULL) |
| lp_s = (char *) "<unknown>"; // cast |
| sprintf(la_buf, "%s (%d-%ld): %s:%u %s: Assertion '%s' failed.\n", |
| lp_s, |
| getpid(), gettid(), |
| pp_file, pv_line, pp_fun, pp_exp); |
| fprintf(stderr, la_buf); |
| fflush(stderr); |
| abort(); |
| } |
| |
| // |
| // alt signal handler |
| // |
| // get a free node, copy siginfo, queue node, signal |
| // |
| void altsig_sig(int pv_sig, siginfo_t *pp_siginfo, void *) { |
| Altsig_Node *lp_node; |
| int lv_err; |
| int lv_lerr; |
| |
| // get lock to queue |
| lv_lerr = pthread_spin_lock(&gv_altsig_spinlock); |
| LIOTM_assert(lv_lerr == 0); |
| |
| // get free node |
| lp_node = gp_altsig_free_head; |
| LIOTM_assert(lp_node != NULL); |
| gp_altsig_free_head = gp_altsig_free_head->ip_next; |
| |
| // set node data |
| lp_node->ip_next = NULL; |
| lp_node->iv_sig = pv_sig; |
| memcpy(&lp_node->iv_siginfo, pp_siginfo, sizeof(lp_node->iv_siginfo)); |
| |
| // queue it up |
| if (gp_altsig_queue_tail == NULL) |
| gp_altsig_queue_head = lp_node; |
| else |
| gp_altsig_queue_tail->ip_next = lp_node; |
| gp_altsig_queue_tail = lp_node; |
| |
| // unlock |
| lv_lerr = pthread_spin_unlock(&gv_altsig_spinlock); |
| LIOTM_assert(lv_lerr == 0); |
| |
| // tell reader there's a node |
| lv_err = pthread_cond_signal(&gv_altsig_cond); |
| LIOTM_assert(lv_err == 0); |
| } |
| |
| // |
| // alt signal init |
| // |
| void altsig_init() { |
| struct sigaction lv_act; |
| int lv_err; |
| int lv_inx; |
| |
| // create free list |
| gpp_altsig_nodes = new Altsig_Node *[ALTSIG_NODES]; |
| for (lv_inx = 0; lv_inx < ALTSIG_NODES; lv_inx++) |
| gpp_altsig_nodes[lv_inx] = new Altsig_Node; |
| for (lv_inx = 0; lv_inx < ALTSIG_NODES - 1; lv_inx++) |
| gpp_altsig_nodes[lv_inx]->ip_next = gpp_altsig_nodes[lv_inx + 1]; |
| gp_altsig_free_head = gpp_altsig_nodes[0]; |
| gp_altsig_free_tail = gpp_altsig_nodes[ALTSIG_NODES - 1]; |
| |
| // init spinlock |
| lv_err = pthread_spin_init(&gv_altsig_spinlock, 0); |
| if (lv_err) |
| abort(); |
| |
| // setup signal handler |
| memset(&lv_act, 0, sizeof(struct sigaction)); |
| lv_act.sa_sigaction = altsig_sig; |
| lv_act.sa_flags = SA_SIGINFO; |
| lv_err = sigaction(SQ_LIO_SIGNAL_REQUEST_REPLY, &lv_act, NULL); |
| if (lv_err) |
| abort(); |
| } |
| |
| // |
| // alt sigtimedwait - act like regular sigtimedwait, but use private queue/cv |
| // |
| int altsig_sigtimedwait(const sigset_t *, siginfo_t *pp_info, const struct timespec *pp_timeout) { |
| Altsig_Node *lp_node; |
| int lv_lerr; |
| int lv_err; |
| int lv_ret; |
| |
| // get lock to check for node |
| lv_lerr = pthread_spin_lock(&gv_altsig_spinlock); |
| LIOTM_assert(lv_lerr == 0); |
| |
| lp_node = gp_altsig_queue_head; |
| if (lp_node == NULL) { |
| // no node, give up lock |
| lv_lerr = pthread_spin_unlock(&gv_altsig_spinlock); |
| LIOTM_assert(lv_lerr == 0); |
| |
| // wait |
| lv_err = pthread_cond_timedwait(&gv_altsig_cond, &gv_altsig_mutex, pp_timeout); |
| |
| // get lock |
| lv_lerr = pthread_spin_lock(&gv_altsig_spinlock); |
| LIOTM_assert(lv_lerr == 0); |
| |
| // check pthread_cond_timedwait outcome |
| switch (lv_err) { |
| case 0: |
| lp_node = gp_altsig_queue_head; |
| break; |
| case ETIMEDOUT: |
| lp_node = gp_altsig_queue_head; // check it even if it timedout |
| break; |
| default: |
| LIOTM_assert(lv_err == 0); // pthread_cond_timedwait returns 0/ETIMEDOUT/EINVAL/EPERM |
| break; |
| } |
| } |
| |
| if (lp_node == NULL) { |
| lv_ret = -1; |
| errno = EAGAIN; |
| } else { |
| lv_ret = 0; |
| // fix queue-head/tail |
| gp_altsig_queue_head = lp_node->ip_next; |
| if (gp_altsig_queue_head == NULL) |
| gp_altsig_queue_tail = NULL; |
| // copy info |
| memcpy(pp_info, &lp_node->iv_siginfo, sizeof(lp_node->iv_siginfo)); |
| // put node on free list |
| lp_node->ip_next = NULL; |
| if (gp_altsig_free_tail == NULL) |
| gp_altsig_free_head = lp_node; |
| else |
| gp_altsig_free_tail->ip_next = lp_node; |
| gp_altsig_free_tail = lp_node; |
| } |
| |
| // unlock |
| lv_lerr = pthread_spin_unlock(&gv_altsig_spinlock); |
| LIOTM_assert(lv_lerr == 0); |
| |
| return lv_ret; |
| } |
| |
| // this is the real time signal processing thread for seabed and the shell |
| // that receives control messages from the monitor. |
| void *local_monitor_reader(void *pp_arg) { |
| const char *WHERE = "local_monitor_reader"; |
| bool lv_altsig; |
| bool lv_not_done; |
| int lv_ret = SUCCESS; |
| sigset_t lv_sig_set; |
| siginfo_t lv_siginfo; |
| int lv_sig; |
| struct timespec lv_tp; |
| pid_t lv_mpid = (pid_t)(long)pp_arg; |
| |
| if (gp_local_mon_io->cv_trace) |
| gp_local_mon_io->trace_where_printf(WHERE, |
| "ENTER, monitor pid = %ld\n", |
| (long) pp_arg); |
| |
| lv_altsig = gp_local_mon_io->iv_altsig; |
| // Setup signal handling |
| sigemptyset(&lv_sig_set); |
| sigaddset(&lv_sig_set, SQ_LIO_SIGNAL_REQUEST_REPLY); |
| LIOTM_assert(pthread_sigmask(SIG_BLOCK, &lv_sig_set, NULL) == 0); |
| |
| gp_local_mon_io->set_worker_thread_id(pthread_self()); |
| |
| lv_not_done = true; |
| while (lv_not_done) { |
| lv_tp.tv_sec = 0; |
| lv_tp.tv_nsec = SQ_LIO_SIGNAL_TIMEOUT; |
| if (lv_altsig) |
| lv_sig = altsig_sigtimedwait( &lv_sig_set, &lv_siginfo, &lv_tp ); |
| else |
| lv_sig = sigtimedwait( &lv_sig_set, &lv_siginfo, &lv_tp ); |
| if (lv_sig == -1) { |
| if (gp_local_mon_io->cv_trace && errno != EAGAIN) |
| gp_local_mon_io->trace_where_printf(WHERE, |
| "monitor pid = %ld, sigtimedwait errno: %d(%s)\n", |
| (long) pp_arg, errno, strerror(errno)); |
| LIOTM_assert(errno == EAGAIN || errno == EINTR); |
| // check if monitor is gone |
| if (kill(lv_mpid,0) == -1 && errno == ESRCH) { |
| if (gp_local_mon_io->cv_trace) |
| gp_local_mon_io->trace_where_printf(WHERE, |
| "monitor pid = %ld, is gone, exiting\n", (long) pp_arg); |
| gp_local_mon_io->shutdown(); |
| break; |
| } |
| } |
| else |
| { |
| // If time to exit thread, set by Local_IO_To_Monitor destructor |
| if ( Shutdown ) |
| { |
| break; |
| } |
| else |
| { |
| lv_ret = gp_local_mon_io->get_io(lv_sig, &lv_siginfo); |
| if (lv_ret == FAILURE ) { |
| |
| break; |
| } |
| } |
| } |
| } |
| |
| if (gp_local_mon_io && gp_local_mon_io->cv_trace) |
| gp_local_mon_io->trace_where_printf(WHERE, "EXIT thread\n"); |
| |
| pthread_exit((void *) errno); // cast |
| return (void *) errno; // cast |
| } |
| |
| void block_lio_signals() { |
| |
| sigset_t lv_sig_set; |
| // Setup signal handling |
| sigemptyset(&lv_sig_set); |
| sigaddset(&lv_sig_set, SQ_LIO_SIGNAL_REQUEST_REPLY); |
| LIOTM_assert(pthread_sigmask(SIG_BLOCK, &lv_sig_set, NULL) == 0); |
| } |
| |
| // the shell and seabed localio constructor |
| Local_IO_To_Monitor::Local_IO_To_Monitor(int pv_pid) |
| :ip_cshm(NULL), ip_cshm_end(NULL), iv_cmid(0), iv_qid(0), iv_port_file_tries ( 30 ) |
| { |
| const char *WHERE = "Local_IO_To_Monitor::Local_IO_To_Monitor"; |
| char *ptr; |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "ENTER\n"); |
| |
| iv_worker_thread_id=0; |
| iv_initted=false; |
| iv_altsig=false; |
| iv_pid=pv_pid; |
| iv_verifier = gv_ms_su_verif; |
| ip_notice_cb=NULL; |
| ip_event_cb=NULL; |
| ip_recv_cb=NULL; |
| ip_unsol_cb=NULL; |
| iv_monitor_down=true; |
| iv_mpid=0; |
| iv_client_buffers_max=SQ_LIO_MAX_BUFFERS; |
| |
| char la_node_name[MPI_MAX_PROCESSOR_NAME]; |
| if (gethostname(la_node_name, MPI_MAX_PROCESSOR_NAME) == -1) |
| { |
| if (cv_trace) |
| trace_where_printf(WHERE, "gethostname failed , errno=%d (%s)\n", |
| errno, strerror(errno)); |
| la_node_name[0] = '\0'; |
| } |
| |
| char *tmpptr = la_node_name; |
| while ( *tmpptr ) |
| { |
| *tmpptr = (char)tolower( *tmpptr); |
| tmpptr++; |
| } |
| |
| char *lp_nodes = getenv("SQ_VIRTUAL_NODES"); |
| if (lp_nodes != NULL) |
| { |
| iv_virtual_nodes = true; |
| iv_nodes = atoi(lp_nodes); |
| if (iv_nodes <= 0) |
| { |
| iv_nodes = 1; |
| } |
| |
| int lv_MyNID; |
| |
| if (gv_ms_su_nid < 0) |
| { // Node id not set, get node number from environment variable |
| char * lp_nid = getenv("SQ_LIO_VIRTUAL_NID"); |
| if (lp_nid != NULL) |
| { |
| lv_MyNID = atoi(lp_nid); |
| if (cv_trace) |
| trace_where_printf(WHERE, "env var SQ_LIO_VIRTUAL_NID=%s, " |
| "MyNID=%d\n", lp_nid, lv_MyNID); |
| } |
| else |
| { |
| if (cv_trace) |
| trace_where_printf(WHERE, "env var SQ_LIO_VIRTUAL_NID " |
| "not found\n"); |
| lv_MyNID = 0; |
| } |
| } |
| else |
| { // Use the globally set node id |
| lv_MyNID = gv_ms_su_nid; |
| } |
| sprintf(ip_port_fname,"%.*s/monitor.port.%d.%s", |
| (int)(sizeof(ip_port_fname)-(sizeof("/monitor.port..")+11 |
| +strlen(la_node_name))), |
| getenv("MPI_TMPDIR"),lv_MyNID,la_node_name); |
| } else |
| { |
| // It's a real cluster |
| iv_virtual_nodes = false; |
| iv_nodes = 1; |
| |
| sprintf(ip_port_fname,"%.*s/monitor.port.%s", |
| (int)(sizeof(ip_port_fname)-(sizeof("/monitor.port.")+11 |
| +strlen(la_node_name))), |
| getenv("MPI_TMPDIR"), la_node_name); |
| |
| } |
| // Assume nid zero if the global Seabed nid variable is not initialized |
| iv_nid = gv_ms_su_nid < 0 ? 0 : gv_ms_su_nid ; |
| // On a real cluster, set base the segment id to zero |
| iv_nid_base = iv_virtual_nodes ? iv_nid : 0; |
| |
| iv_recv_tid=0; |
| iv_unknown_process=false; |
| iv_shutdown=false; |
| ptr = getenv( "SQ_LIO_MAX_BUFFERS" ); |
| if (ptr) { |
| int nb = atoi( ptr ); |
| if (nb > 0) |
| { |
| iv_client_buffers_max = nb; |
| } |
| } |
| iv_acquired_buffer_count = 0; |
| iv_acquired_buffer_count_max = 0; |
| |
| // Check for override of default setting of retry count used by |
| // wait_for_monitor() |
| ptr = getenv( "SQ_LIO_PORT_FILE_TRIES" ); |
| if (ptr) { |
| iv_port_file_tries = strtol ( ptr, NULL, 10 ); |
| if ( iv_port_file_tries < 10 ) |
| iv_port_file_tries = 10; |
| } |
| |
| ptr = getenv( "SQ_LIO_CLIENT_STATS" ); |
| if (ptr && *ptr == '1') { |
| iv_stats = true; |
| } |
| else { |
| iv_stats = false; |
| } |
| // Setup thread control |
| iv_sr_signaled = false; |
| iv_sr_interrupted = false; |
| LIOTM_assert(pthread_mutex_init(&iv_sr_lock, NULL) == 0); |
| LIOTM_assert(pthread_cond_init(&iv_sr_cv, NULL) == 0); |
| iv_ev_signaled = false; |
| LIOTM_assert(pthread_mutex_init(&iv_ev_lock, NULL) == 0); |
| LIOTM_assert(pthread_cond_init(&iv_ev_cv, NULL) == 0); |
| |
| // Setup signal handling |
| // The main thread in client process cannot handle |
| // LIO real time signals |
| sigset_t lv_sig_set; |
| sigemptyset(&lv_sig_set); |
| sigaddset(&lv_sig_set, SQ_LIO_SIGNAL_REQUEST_REPLY); |
| LIOTM_assert(pthread_sigmask(SIG_BLOCK, &lv_sig_set, NULL) == 0); |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "EXIT\n"); |
| } |
| |
| // the shell and seabed localio destructor |
| Local_IO_To_Monitor::~Local_IO_To_Monitor() { |
| const char *WHERE = "Local_IO_To_Monitor::~Local_IO_To_Monitor"; |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "ENTER\n"); |
| |
| Shutdown = iv_shutdown = true; |
| |
| if (iv_worker_thread_id != 0) |
| { |
| pthread_kill(iv_worker_thread_id, SQ_LIO_SIGNAL_REQUEST_REPLY); |
| int rc = pthread_join(iv_worker_thread_id, NULL); |
| if (rc) |
| { |
| if (cv_trace) |
| trace_where_printf(WHERE, "EXIT, Error= Failed waiting for thread to exit! - errno = %d (%s)\n", rc, strerror(rc)); |
| } |
| } |
| |
| if (iv_initted) |
| { |
| if (iv_stats) |
| { |
| if (cv_trace) |
| { |
| trace_where_printf( WHERE, "EXIT, LIO Stats: shared buffers: inuse=%d, total=%d, acquiredMax=%d\n" |
| , iv_acquired_buffer_count |
| , iv_client_buffers_max |
| , iv_acquired_buffer_count_max |
| ); |
| } |
| } |
| if ( iv_acquired_buffer_count != 0 ) |
| { |
| char la_buf[256]; |
| sprintf(la_buf, "[%s], %d local io buffers were acquired but " |
| "not released.\n", |
| WHERE, iv_acquired_buffer_count); |
| log_error ( MON_CLIO_LEAK_1, SQ_LOG_WARNING, la_buf ); |
| } |
| |
| // detach from shared memory |
| shmdt(ip_cshm); |
| } |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "EXIT\n"); |
| } |
| |
| int Local_IO_To_Monitor::mon_port_num() |
| { |
| #define BLOCK_SIZE 512 |
| char MonitorPort[BLOCK_SIZE]; |
| unsigned long int myPortNum = 0; |
| int rc = 0; |
| struct stat lv_stat; |
| const char *WHERE = "Local_IO_To_Monitor::mon_port_num"; |
| |
| if ( strlen(ga_ms_su_c_port) != 0 ) |
| { |
| char *lp_port = strstr(ga_ms_su_c_port, "$port#"); |
| if (lp_port) |
| { |
| lp_port += 5; |
| } |
| else |
| { |
| lp_port = strchr(ga_ms_su_c_port, ':'); |
| } |
| myPortNum = strtoul(&lp_port[1], NULL, 10); |
| |
| if (gp_local_mon_io && gp_local_mon_io->cv_trace) |
| gp_local_mon_io->trace_where_printf(WHERE, "Return, monitor port=%d (%s)\n", (int)myPortNum, &lp_port[1]); |
| |
| return (int)myPortNum; |
| } |
| |
| memset( (void *)MonitorPort, 0 , BLOCK_SIZE ); |
| memset( (void *)&lv_stat, 0 , sizeof(lv_stat) ); |
| int fd = open( ip_port_fname, O_RDONLY ); |
| if( fd == -1 ) |
| { |
| if (gp_local_mon_io && gp_local_mon_io->cv_trace) |
| gp_local_mon_io->trace_where_printf(WHERE, "Cannot open %s, errno=%d, %s\n", ip_port_fname, errno, strerror(errno)); |
| } |
| else |
| { |
| do |
| { |
| rc = stat( ip_port_fname, &lv_stat ); |
| if ( lv_stat.st_size || rc == -1 ) |
| { |
| break; |
| } |
| // we need to wait for the monitor to write the port number |
| // only in the startup case |
| usleep(25000); // 25 ms |
| } |
| while ( lv_stat.st_size == 0 && rc == 0 ); |
| |
| if ( rc == 0 ) |
| { |
| rc = (int) read( fd, MonitorPort, BLOCK_SIZE); |
| if ( rc == -1 ) |
| { |
| int err = errno; |
| if (gp_local_mon_io && gp_local_mon_io->cv_trace) |
| gp_local_mon_io->trace_where_printf(WHERE, "Cannot read %s, errno=%d, %s\n", ip_port_fname, err, strerror(err)); |
| } |
| else |
| { // Ensure port string is null terminated |
| MonitorPort[lv_stat.st_size - 1] = '\0'; |
| ip_mon_mpi_port = MonitorPort; |
| } |
| |
| char *lp_port = strstr(MonitorPort, "$port#"); |
| if (lp_port) |
| { |
| lp_port += 5; |
| } |
| else |
| { |
| lp_port = strchr(MonitorPort, ':'); |
| } |
| myPortNum = strtoul(&lp_port[1], NULL, 10); |
| } |
| close(fd); |
| } |
| |
| if (gp_local_mon_io && gp_local_mon_io->cv_trace) |
| gp_local_mon_io->trace_where_printf(WHERE, "Return, monitor port=%d\n", (int)myPortNum); |
| |
| return (int)myPortNum; |
| } |
| |
| // acquire the lock used to synchronize the threads on send/recv/notice |
| int Local_IO_To_Monitor::acquire_lock( bool pv_show ) { |
| const char *WHERE = "Local_IO_To_Monitor::acquire_lock"; |
| int lv_ret = SUCCESS; |
| |
| if (cv_trace && pv_show) |
| trace_where_printf(WHERE, "ENTER\n"); |
| |
| lv_ret = pthread_mutex_lock(&iv_sr_lock); |
| if ( lv_ret != 0) { |
| char la_buf[256]; |
| sprintf(la_buf, "[%s], pthread_mutex_lock returned error=%d (%s)\n", |
| WHERE, lv_ret, strerror(lv_ret)); |
| log_error ( MON_CLIO_ACQUIRE_LOCK_1, SQ_LOG_ERR, la_buf ); |
| |
| errno = lv_ret; |
| lv_ret = FAILURE; |
| } |
| else { |
| errno = 0; |
| } |
| |
| if (cv_trace && pv_show) |
| trace_where_printf(WHERE, "EXIT, ret=%d\n", lv_ret); |
| |
| return(lv_ret); |
| } |
| |
| // acquire the lock used to synchronize the threads on event |
| int Local_IO_To_Monitor::acquire_ev_lock( bool pv_show ) { |
| const char *WHERE = "Local_IO_To_Monitor::acquire_ev_lock"; |
| int lv_ret = SUCCESS; |
| |
| if (cv_trace && pv_show) |
| trace_where_printf(WHERE, "ENTER\n"); |
| |
| lv_ret = pthread_mutex_lock(&iv_ev_lock); |
| if ( lv_ret != 0) { |
| char la_buf[256]; |
| sprintf(la_buf, "[%s], pthread_mutex_lock returned error=%d (%s)\n", |
| WHERE, lv_ret, strerror(lv_ret)); |
| log_error ( MON_CLIO_ACQUIRE_LOCK_2, SQ_LOG_ERR, la_buf ); |
| |
| errno = lv_ret; |
| lv_ret = FAILURE; |
| } |
| else { |
| errno = 0; |
| } |
| |
| if (cv_trace && pv_show) |
| trace_where_printf(WHERE, "EXIT, ret=%d, errno=%d(%s)\n", lv_ret, errno, strerror(errno)); |
| |
| return(lv_ret); |
| } |
| |
| // get a client buffer from the available pool |
| int Local_IO_To_Monitor::acquire_msg(struct message_def **pp_msg) { |
| const char *WHERE = "Local_IO_To_Monitor::acquire_msg"; |
| ClientBufferInfo lv_cbi; |
| SharedMsgDef *lp_sr_msg = NULL; |
| struct message_def *lp_msg = NULL; |
| struct msqid_ds lv_mds; |
| int lv_ret = SUCCESS; |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "ENTER\n"); |
| |
| errno = 0; |
| lv_ret = acquire_lock(); |
| if ( lv_ret == FAILURE) { |
| if (cv_trace) |
| trace_where_printf(WHERE, "EXIT, ret=%p\n", (void *)lp_msg); |
| *pp_msg = NULL; |
| return lv_ret; |
| } |
| |
| if (cv_trace) { |
| lv_ret = msgctl(iv_qid, IPC_STAT, &lv_mds); |
| if (lv_ret != -1) { |
| trace_where_printf(WHERE, |
| "(qid=%d) shared buffers available=%d, " |
| "acquired=%d, errno=%d\n", |
| iv_qid, (int)lv_mds.msg_qnum, |
| iv_acquired_buffer_count, errno); |
| } |
| } |
| // get a client buffer from the available pool |
| lv_ret = (int) msgrcv( iv_qid, &lv_cbi, |
| sizeof( lv_cbi.index ), |
| SQ_LIO_NORMAL_MSG, IPC_NOWAIT ); |
| if (lv_ret == -1) { |
| lv_ret = errno; |
| char la_buf[256]; |
| sprintf(la_buf, "[%s], msgrcv() failed getting buffer from shared pool, errno=%d (%s)\n", WHERE, errno, strerror(errno)); |
| log_error ( MON_CLIO_ACQUIRE_MSG_1, SQ_LOG_ERR, la_buf ); |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "%s", la_buf); |
| |
| *pp_msg = NULL; |
| LIOTM_assert(release_lock() == SUCCESS); |
| return lv_ret; |
| } |
| |
| LIOTM_assert(lv_ret == sizeof( lv_cbi.index )); |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "dequeued shared buffer, idx=%d\n", lv_cbi.index); |
| |
| lp_sr_msg = (SharedMsgDef *)(ip_cshm+sizeof(SharedMemHdr) |
| +(lv_cbi.index*sizeof(SharedMsgDef))); |
| memset( (void*)&lp_sr_msg->trailer, 0, sizeof(lp_sr_msg->trailer) ); |
| lp_sr_msg->trailer.index = lv_cbi.index; |
| lp_sr_msg->trailer.OSPid = iv_pid; |
| lp_sr_msg->trailer.verifier = iv_verifier; |
| lp_sr_msg->trailer.bufInUse = iv_pid; |
| clock_gettime(CLOCK_REALTIME, &lp_sr_msg->trailer.timestamp); |
| |
| if (cv_trace) { |
| ++iv_acquired_buffer_count; |
| iv_acquired_buffer_count_max = |
| iv_acquired_buffer_count > iv_acquired_buffer_count_max |
| ? iv_acquired_buffer_count : iv_acquired_buffer_count_max; |
| |
| lv_ret = msgctl(iv_qid, IPC_STAT, &lv_mds); |
| if (lv_ret != -1) { |
| trace_where_printf(WHERE, |
| "(qid=%d) shared buffers available=%d, " |
| "acquired=%d, errno=%d\n", |
| iv_qid, (int)lv_mds.msg_qnum, |
| iv_acquired_buffer_count, errno); |
| } |
| } |
| |
| if (cv_trace) |
| trace_print_msg(WHERE, lp_sr_msg); |
| |
| lv_ret = release_lock(); |
| LIOTM_assert(lv_ret == SUCCESS); |
| |
| if (iv_unknown_process) |
| { |
| LIOTM_assert(iv_unknown_process); |
| } |
| else |
| lp_msg = &lp_sr_msg->msg; |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "EXIT, ret=%p\n", (void *) lp_msg); |
| |
| *pp_msg = lp_msg; |
| return lv_ret; |
| } |
| |
| |
| // called from the monitor reader thread when a rt signal is received |
| // from the monitor |
| int Local_IO_To_Monitor::get_io(int pv_sig, siginfo_t *pp_siginfo) { |
| const char *WHERE = "Local_IO_To_Monitor::get_io"; |
| int lv_ret = SUCCESS; |
| int lv_idx; |
| SharedMsgDef *lv_m; |
| |
| // type is the control message from the monitor (low order byte) |
| MonitorCtlType lv_type = (MonitorCtlType) (0xff & pp_siginfo->si_int); |
| // the index to client buffer is the payload shifted over 8 bits. |
| lv_idx = (pp_siginfo->si_int >> 8) & 0xfffff; |
| |
| if (cv_trace) { |
| const char *lp_type_str = get_type_str(lv_type); |
| trace_where_printf(WHERE, |
| "ENTER, got ctl msg: type=%d(%s), val=0x%lx, idx=%d, sig=%d\n", |
| lv_type, lp_type_str, (long) pp_siginfo->si_int, lv_idx, pv_sig); |
| } |
| |
| if (lv_idx < 0 || lv_idx > iv_client_buffers_max) { |
| if (cv_trace) { |
| trace_where_printf(WHERE, |
| "ENTER, shared buffer index out of range: idx=%d(0x%x), max=%d\n", |
| lv_idx, lv_idx, iv_client_buffers_max); |
| } |
| return lv_ret; |
| } |
| |
| lv_m = (SharedMsgDef *)(ip_cshm+sizeof(SharedMemHdr) |
| +(lv_idx*sizeof(SharedMsgDef))); |
| |
| // Bug catcher: shared buffer pid and verifier must match my process |
| LIOTM_assert((iv_pid ==-1 && iv_verifier ==-1) || |
| (iv_pid == lv_m->trailer.OSPid && iv_verifier == -1) || |
| (iv_pid == lv_m->trailer.OSPid && |
| iv_verifier == lv_m->trailer.verifier) || |
| (lv_m->trailer.OSPid == BCAST_PID && |
| lv_m->trailer.verifier == -1)); |
| |
| switch (lv_type) { |
| case MC_ReadySend: |
| if (cv_trace) |
| trace_where_printf(WHERE, "Got Ready Send msg\n"); |
| // the monitor has told us that it no longer needs this client |
| // buffer and that the monitor has no reply data, so release it |
| lv_ret = release_msg(&lv_m->msg); |
| break; |
| |
| case MC_NoticeReady: |
| if (cv_trace) |
| trace_where_printf(WHERE, "Got notice msg\n"); |
| // A notice is available to the client |
| lv_ret = process_notice(&lv_m->msg); |
| if ( lv_ret == SUCCESS) { |
| { |
| // Tell the monitor we don't need the shared buffer any more |
| if (cv_trace) |
| trace_where_printf(WHERE, "sending Notice Clear to Monitor, buffer #%d\n", |
| lv_m->trailer.index); |
| if (lv_idx == lv_m->trailer.index) |
| lv_ret = send_ctl_msg( MC_NoticeClear, lv_m->trailer.index ); |
| } |
| } |
| break; |
| |
| case MC_SReady: |
| if (cv_trace) |
| trace_where_printf(WHERE, "Got Recv msg\n"); |
| // got a request or a reply from the monitor, |
| // signal IO completion to waiting thread |
| lv_ret = acquire_lock(); |
| if ( lv_ret == SUCCESS) { |
| // tell the waiting thread of the completion |
| lv_m->trailer.received = true; |
| lv_ret = signal_cv(); |
| |
| |
| if ( lv_ret == SUCCESS) { |
| lv_ret = release_lock(); |
| LIOTM_assert(lv_ret == SUCCESS); |
| } |
| else { |
| LIOTM_assert(release_lock() == SUCCESS); |
| } |
| } |
| |
| break; |
| |
| |
| default: |
| if (cv_trace) |
| trace_where_printf(WHERE, |
| "This is a strange place to be, type: %d\n", lv_type); |
| break; |
| } |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "EXIT, ret=%d\n", lv_ret); |
| |
| return lv_ret; |
| } |
| |
| const char *Local_IO_To_Monitor::get_type_str(int pv_type) { |
| const char *lp_type_str; |
| switch (pv_type) |
| { |
| case MC_ReadySend: |
| lp_type_str = "ReadySend"; |
| break; |
| case MC_NoticeReady: |
| lp_type_str = "NoticeReady"; |
| break; |
| case MC_SReady: |
| lp_type_str = "SReady"; |
| break; |
| case MC_AttachStartup: |
| lp_type_str = "AttachStartup"; |
| break; |
| case MC_NoticeClear: |
| lp_type_str = "NoticeClear"; |
| break; |
| default: |
| lp_type_str = "<unknown>"; |
| break; |
| } |
| return lp_type_str; |
| } |
| |
| // this initialization routine will start the reader thread |
| bool Local_IO_To_Monitor::init_comm(bool altsig) { |
| const char *WHERE = "Local_IO_To_Monitor::init_comm"; |
| int retries = 4; |
| bool lv_lio_init = false; |
| bool lv_ret = false; // assume failure |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "ENTER, initted: %d, down %d\n", |
| iv_initted, iv_monitor_down); |
| |
| errno = 0; |
| iv_altsig = altsig; |
| if (iv_initted) |
| { |
| if (cv_trace) |
| trace_where_printf(WHERE, "local IO already initialized\n"); |
| if (is_monitor_up()) |
| { |
| if (cv_trace) |
| trace_where_printf(WHERE, "monitor is up\n"); |
| } |
| else |
| { |
| if (cv_trace) |
| trace_where_printf(WHERE, "monitor is down\n"); |
| lv_ret = true; |
| } |
| } |
| else |
| { |
| if (cv_trace) |
| trace_where_printf(WHERE, "initializing local IO\n"); |
| do |
| { |
| if ( init_local_IO() ) |
| { |
| lv_lio_init = true; |
| break; |
| } |
| // we need to wait for the monitor to write the port number |
| // only in the startup case |
| --retries; |
| sleep(1); |
| } |
| while (retries); |
| if (lv_lio_init) |
| { |
| if (is_monitor_up()) |
| { |
| if (cv_trace) |
| trace_where_printf(WHERE, "monitor is up\n"); |
| if (altsig) |
| altsig_init(); |
| // create client worker thread |
| int lv_rc = pthread_create(&iv_recv_tid, |
| NULL, |
| local_monitor_reader, |
| (void *) (long) iv_mpid); |
| if ( lv_rc != 0) { |
| errno = lv_rc; |
| lv_ret = false; |
| } |
| else |
| lv_ret = true; // indicate just initialized |
| } |
| else |
| { |
| if (cv_trace) |
| trace_where_printf(WHERE, "monitor is down\n"); |
| lv_ret = true; |
| } |
| } |
| } |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "EXIT, ret=%d, errno=%d(%s)\n", lv_ret, errno, strerror(errno)); |
| |
| return (lv_ret); |
| } |
| |
| // check if the local monitor is up |
| bool Local_IO_To_Monitor::init_local_IO() { |
| const char *WHERE = "Local_IO_To_Monitor::init_local_IO"; |
| int lv_errno; |
| bool lv_ret = false; // assume failure |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "ENTER\n"); |
| |
| iv_initted = false; |
| errno = 0; |
| |
| if (cv_trace && iv_virtual_nodes) |
| trace_where_printf(WHERE, "virtual nodes=%d\n", iv_nodes); |
| |
| key_t lv_sharedSegKey = (iv_nid_base << 28)+ 0x10000 |
| + (mon_port_num() & 0xFFFF); |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "nodes=%d, nid=%d, gv_ms_su_nid=%d, nidBase=%d, sharedSegKey=0x%x\n" |
| , iv_nodes, iv_nid, gv_ms_su_nid, iv_nid_base, |
| lv_sharedSegKey); |
| |
| size_t lv_shsize = (iv_client_buffers_max * sizeof( SharedMsgDef )) + |
| sizeof( SharedMemHdr ); |
| long enableHugePages; |
| int lv_shmFlag = SQ_LIO_SHM_PERMISSIONS; |
| char *envShmHugePages = getenv("SQ_ENABLE_HUGEPAGES"); |
| |
| if (envShmHugePages != NULL) |
| { |
| enableHugePages = (long) atoi(envShmHugePages); |
| if (enableHugePages > 0) |
| { |
| // Round it 2 MB - Huge page size |
| // Possible get the HugePageSize and adjust it accordingly |
| lv_shsize = (lv_shsize + (2*1024*1024)) >> 22 << 22; |
| if (lv_shsize == 0) |
| lv_shsize = 2 *1024 *1024; |
| lv_shmFlag = lv_shmFlag | SHM_HUGETLB; |
| } |
| } |
| iv_cmid = shmget( lv_sharedSegKey, lv_shsize, lv_shmFlag ); |
| if (iv_cmid == -1) { |
| lv_errno = errno; |
| if (cv_trace) |
| trace_where_printf(WHERE, "failed shmget(0x%x,%d), errno=%d(%s)\n", lv_sharedSegKey, (int)lv_shsize, lv_errno, strerror(lv_errno)); |
| errno = lv_errno; |
| } |
| else { |
| if (cv_trace) |
| trace_where_printf(WHERE, "shared-memory-id=%d, size=%d, key=0x%x\n", iv_cmid, (int)lv_shsize, lv_sharedSegKey); |
| |
| ip_cshm = (char *) shmat(iv_cmid, NULL, 0); // cast |
| if (ip_cshm == (void *) -1) { |
| lv_errno = errno; |
| perror( "failed shmat()" ); |
| if (cv_trace) |
| trace_where_printf(WHERE, "failed shmat(%d) errno=%d(%s)\n", iv_cmid, lv_errno, strerror(lv_errno)); |
| errno = lv_errno; |
| } |
| else |
| { |
| iv_mpid = ((SharedMemHdr*)ip_cshm)->mPid; |
| LIOTM_assert(iv_mpid > 0); |
| if (cv_trace) |
| trace_where_printf(WHERE, "shared-memory=%p, monitor pid=%d, nid=%d\n" |
| , ip_cshm, iv_mpid, iv_nid); |
| |
| iv_qid = msgget( lv_sharedSegKey, SQ_LIO_MSQ_PERMISSIONS ); |
| if (iv_qid == -1) { |
| lv_errno = errno; |
| perror( "failed msgget()" ); |
| if (cv_trace) |
| trace_where_printf(WHERE, "failed msgget() errno=%d(%s)\n", lv_errno, strerror(lv_errno)); |
| // detach from shared memory |
| shmdt(ip_cshm); |
| errno = lv_errno; |
| ip_cshm = NULL; |
| iv_qid = iv_mpid = 0; |
| } |
| else { |
| ip_cshm_end = ip_cshm + lv_shsize; |
| iv_initted = lv_ret = true; |
| } |
| } |
| } |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "EXIT, ret=%d, errno=%d(%s)\n", lv_ret, errno, strerror(errno)); |
| |
| return lv_ret; |
| } |
| |
| // check if the local monitor is up |
| bool Local_IO_To_Monitor::is_monitor_up() { |
| const char *WHERE = "Local_IO_To_Monitor::is_monitor_up"; |
| int lv_retries = 0; |
| bool lv_ret = false; |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "ENTER\n"); |
| |
| LIOTM_assert(iv_initted); |
| |
| if (iv_mpid) |
| { |
| iv_monitor_down = true; |
| do |
| { |
| if (kill(iv_mpid,0) == -1) |
| { |
| if (cv_trace) |
| trace_where_printf(WHERE, "no monitor process, errno=%d(%s)\n", errno, strerror(errno)); |
| ++lv_retries; |
| usleep( 1000 * lv_retries ); |
| if ( iv_mpid != ((SharedMemHdr*)ip_cshm)->mPid) |
| { |
| iv_mpid = ((SharedMemHdr*)ip_cshm)->mPid; |
| } |
| } |
| else |
| { |
| iv_monitor_down = false; |
| lv_ret = true; |
| } |
| } |
| while( iv_monitor_down && lv_retries < 10 ); |
| if ( iv_monitor_down && iv_initted ) |
| { |
| // detach from shared memory |
| shmdt(ip_cshm); |
| ip_cshm = NULL; |
| iv_qid = iv_mpid = 0; |
| iv_initted = false; |
| } |
| } |
| else |
| { |
| if (cv_trace) |
| trace_where_printf(WHERE, "monitor pid=%d\n", iv_mpid); |
| iv_monitor_down = true; |
| lv_ret = false; |
| } |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "EXIT, ret=%d\n", lv_ret); |
| |
| return lv_ret; |
| } |
| |
| // process a notice received from the monitor |
| int Local_IO_To_Monitor::process_notice(struct message_def *pp_msg) { |
| const char *WHERE = "Local_IO_To_Monitor::process_notice"; |
| int lv_ret = SUCCESS; |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "ENTER, msgtype: %d\n", pp_msg->type); |
| |
| switch (pp_msg->type) { |
| case MsgType_Close: |
| case MsgType_Open: |
| case MsgType_Change: |
| case MsgType_ProcessCreated: |
| if (cv_trace) |
| trace_where_printf(WHERE, |
| "calling recv callback %d received\n", |
| pp_msg->u.request.type ); |
| if (ip_recv_cb) |
| { |
| ip_recv_cb(pp_msg, size_of_msg(pp_msg)); |
| } |
| else |
| { |
| lv_ret = put_on_notice_list( pp_msg, size_of_msg(pp_msg)); |
| } |
| break; |
| |
| case MsgType_NodeDown: |
| case MsgType_NodeQuiesce: |
| case MsgType_NodePrepare: |
| case MsgType_NodeUp: |
| case MsgType_NodeJoining: |
| case MsgType_SpareUp: |
| case MsgType_ProcessDeath: |
| case MsgType_Shutdown: |
| case MsgType_TmRestarted: |
| case MsgType_TmSyncAbort: |
| case MsgType_TmSyncCommit: |
| case MsgType_ReintegrationError: |
| if (cv_trace) |
| trace_where_printf(WHERE, |
| "notice %d received\n", |
| pp_msg->u.request.type); |
| if (ip_notice_cb) |
| ip_notice_cb(pp_msg, size_of_msg(pp_msg)); |
| else |
| lv_ret = put_on_notice_list( pp_msg, size_of_msg(pp_msg)); |
| break; |
| |
| case MsgType_Event: |
| if (cv_trace) |
| trace_where_printf(WHERE, |
| "Event %d received\n", |
| pp_msg->u.request.u.event_notice.event_id); |
| if (ip_event_cb) |
| { |
| ip_event_cb(pp_msg, size_of_msg(pp_msg)); |
| } |
| else |
| { |
| lv_ret = put_on_event_list( pp_msg, size_of_msg(pp_msg)); |
| } |
| break; |
| |
| case MsgType_UnsolicitedMessage: |
| if (cv_trace) |
| trace_where_printf(WHERE, |
| "Unsolicited msg, type=%d received\n", |
| pp_msg->u.request.type); |
| if (ip_unsol_cb) |
| ip_unsol_cb(pp_msg, size_of_msg(pp_msg)); |
| else |
| lv_ret = put_on_notice_list( pp_msg, size_of_msg(pp_msg)); |
| break; |
| |
| default: |
| if (cv_trace) |
| trace_where_printf(WHERE, |
| "Invalid Notice Type(%d) received\n", |
| pp_msg->type); |
| lv_ret = FAILURE; |
| LIOTM_assert(0); |
| break; |
| } |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "EXIT, ret=%d\n", lv_ret); |
| |
| return lv_ret; |
| } |
| |
| // Put it on the event list to be removed with wait_for_event() |
| int Local_IO_To_Monitor::put_on_event_list( struct message_def *pp_msg, |
| int pv_size ) { |
| const char *WHERE = "Local_IO_To_Monitor::put_on_event_list"; |
| int lv_ret = SUCCESS; |
| struct message_def *lv_event = new struct message_def; |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "ENTER - event size %d\n", pv_size); |
| |
| memcpy( lv_event, pp_msg, pv_size ); |
| |
| lv_ret = acquire_ev_lock(); |
| if ( lv_ret == SUCCESS) { |
| iv_event_list.push_back( lv_event ); |
| lv_ret = signal_event_cv(); |
| if ( lv_ret == SUCCESS) |
| lv_ret = release_ev_lock(); |
| else |
| release_ev_lock(); |
| } |
| else |
| { |
| delete lv_event; |
| } |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "EXIT, ret=%d, errno=%d(%s)\n", lv_ret, errno, strerror(errno)); |
| |
| return(lv_ret); |
| } |
| |
| // if a notice cannot be processed right away, put it on the notice list |
| // to be removed with get_notice() |
| int Local_IO_To_Monitor::put_on_notice_list( struct message_def *pp_msg, |
| int pv_size ) { |
| const char *WHERE = "Local_IO_To_Monitor::put_on_notice_list"; |
| int lv_ret = SUCCESS; |
| struct message_def *lv_notice = new struct message_def; |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "ENTER - notice size %d\n", pv_size); |
| |
| memcpy( lv_notice, pp_msg, pv_size ); |
| |
| lv_ret = acquire_lock(); |
| if ( lv_ret == SUCCESS) { |
| iv_notice_list.push_back( lv_notice ); |
| lv_ret = signal_cv(); |
| if ( lv_ret == SUCCESS) |
| { |
| lv_ret = release_lock(); |
| LIOTM_assert(lv_ret == SUCCESS); |
| } |
| else |
| { |
| LIOTM_assert(release_lock() == SUCCESS); |
| } |
| } |
| else |
| { |
| delete lv_notice; |
| } |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "EXIT, ret=%d, errno=%d(%s)\n", lv_ret, errno, strerror(errno)); |
| |
| return(lv_ret); |
| } |
| |
| // get the first notice from the list |
| int Local_IO_To_Monitor::get_notice( struct message_def **pp_msg, bool wait ) { |
| const char *WHERE = "Local_IO_To_Monitor::get_notice"; |
| int lv_ret = SUCCESS; |
| struct message_def *lp_msg = NULL; |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "ENTER\n"); |
| |
| errno = 0; |
| if (!iv_notice_list.empty() || wait) |
| { |
| lv_ret = acquire_lock(); |
| if ( lv_ret == SUCCESS) { |
| while (iv_notice_list.empty()) |
| { |
| if (cv_trace) |
| trace_where_printf(WHERE, "waiting for notice\n"); |
| |
| lv_ret = wait_on_cv(); |
| if ( lv_ret != SUCCESS) { |
| break; |
| } |
| } |
| if ( lv_ret == SUCCESS) { |
| lp_msg = iv_notice_list.front(); |
| iv_notice_list.pop_front(); |
| } |
| lv_ret = release_lock(); |
| LIOTM_assert(lv_ret == SUCCESS); |
| } |
| } |
| |
| if (lv_ret == FAILURE) |
| lp_msg = NULL; |
| |
| *pp_msg = lp_msg; |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "EXIT, notice=%p, ret=%d, errno=%d(%s)\n" |
| , (void *) lp_msg, lv_ret, errno, strerror(errno)); |
| |
| return lv_ret; |
| } |
| |
| |
| // release the lock used to synchronize the threads on send/recv/notice |
| int Local_IO_To_Monitor::release_lock( bool pv_show ) { |
| const char *WHERE = "Local_IO_To_Monitor::release_lock"; |
| int lv_ret = SUCCESS; |
| |
| if (cv_trace && pv_show) |
| trace_where_printf(WHERE, "ENTER\n"); |
| |
| lv_ret = pthread_mutex_unlock(&iv_sr_lock); |
| if ( lv_ret != 0) { |
| char la_buf[256]; |
| sprintf(la_buf, "[%s], pthread_mutex_unlock returned error=%d (%s)\n", |
| WHERE, lv_ret, strerror(lv_ret)); |
| log_error ( MON_CLIO_RELEASE_LOCK_1, SQ_LOG_ERR, la_buf ); |
| |
| errno = lv_ret; |
| lv_ret = FAILURE; |
| } |
| else { |
| errno = 0; |
| } |
| |
| if (cv_trace && pv_show) |
| trace_where_printf(WHERE, "EXIT, ret=%d\n", lv_ret); |
| |
| return lv_ret; |
| } |
| |
| // release the lock used to synchronize the threads on event |
| int Local_IO_To_Monitor::release_ev_lock( bool pv_show ) { |
| const char *WHERE = "Local_IO_To_Monitor::release_ev_lock"; |
| int lv_ret = SUCCESS; |
| |
| if (cv_trace && pv_show) |
| trace_where_printf(WHERE, "ENTER\n"); |
| |
| lv_ret = pthread_mutex_unlock(&iv_ev_lock); |
| if ( lv_ret != 0) { |
| char la_buf[256]; |
| sprintf(la_buf, "[%s], pthread_mutex_unlock returned error=%d (%s)\n", |
| WHERE, lv_ret, strerror(lv_ret)); |
| log_error ( MON_CLIO_RELEASE_LOCK_2, SQ_LOG_ERR, la_buf ); |
| |
| errno = lv_ret; |
| lv_ret = FAILURE; |
| } |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "EXIT, ret=%d, errno=%d(%s)\n", lv_ret, errno, strerror(errno)); |
| |
| return lv_ret; |
| } |
| |
| // release a client buffer back to the available pool |
| int Local_IO_To_Monitor::release_msg(struct message_def *pp_msg, |
| bool pv_lock) { |
| const char *WHERE = "Local_IO_To_Monitor::release_msg"; |
| int lv_ret = SUCCESS; |
| ClientBufferInfo lv_cbi; |
| SharedMsgDef *lp_sr_msg = reinterpret_cast<SharedMsgDef *>(pp_msg); |
| int lv_buffer_index = -1; |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "ENTER, buff=%p\n", (void *) pp_msg); |
| |
| errno = 0; |
| |
| if (pv_lock) |
| { |
| lv_ret = acquire_lock(); |
| if (lv_ret == FAILURE) { |
| if (cv_trace) |
| trace_where_printf(WHERE, "EXIT, ret=%d\n", lv_ret); |
| return(lv_ret); |
| } |
| } |
| |
| if ((char*)pp_msg >= ip_cshm && |
| (char*)pp_msg < ip_cshm_end ) |
| { // Buffer to be released is in local io buffer shared memory region |
| |
| lv_buffer_index = lp_sr_msg->trailer.index; |
| |
| if ( lp_sr_msg->trailer.bufInUse != -1 |
| && lp_sr_msg->trailer.bufInUse != iv_pid) |
| { // Unexpectedly, buffer is not owned by this process |
| char la_buf[256]; |
| sprintf(la_buf, "[%s], attempt to release buffer %d but it is not" |
| " owned by this process. Last freed by pid=%d, verifier%d\n", |
| WHERE, |
| lv_buffer_index, |
| lp_sr_msg->trailer.OSPid, |
| lp_sr_msg->trailer.verifier); |
| log_error ( MON_CLIO_RELEASE_MSG_1, SQ_LOG_ERR, la_buf ); |
| } |
| else if (lv_buffer_index >= 0 |
| && lv_buffer_index < iv_client_buffers_max) |
| { // buffer index is in valid range |
| |
| // reset trailer values |
| lp_sr_msg->trailer.received = 0; |
| lp_sr_msg->trailer.attaching = 0; |
| lp_sr_msg->trailer.bufInUse = 0; |
| lp_sr_msg->trailer.OSPid = iv_pid; // Identify releaser |
| lp_sr_msg->trailer.verifier = iv_verifier; // Identify releaser |
| |
| // put the buffer back in the shared buffer pool |
| lv_cbi.mtype = SQ_LIO_NORMAL_MSG; |
| lv_cbi.index = lv_buffer_index; |
| lv_ret = msgsnd( iv_qid, &lv_cbi, sizeof( lv_cbi.index ), 0); |
| if (lv_ret == -1) { |
| lv_ret = errno; |
| if (cv_trace) |
| trace_where_printf(WHERE, |
| "EXIT, msgsnd() failed returning buffer" |
| " to shared pool, errno=%d (%s)\n", |
| errno, strerror(errno)); |
| if (pv_lock) { |
| LIOTM_assert(release_lock() == SUCCESS); |
| } |
| return(lv_ret); |
| } |
| |
| if (cv_trace) { |
| struct msqid_ds lv_mds; |
| |
| trace_where_printf(WHERE, "returned buffer to shared pool. " |
| "idx=%d\n", lp_sr_msg->trailer.index); |
| |
| --iv_acquired_buffer_count; |
| |
| lv_ret = msgctl(iv_qid, IPC_STAT, &lv_mds); |
| if (lv_ret != -1) { |
| trace_where_printf(WHERE, |
| "(qid=%d) shared buffers available=%d, " |
| "acquired=%d, errno=%d\n", |
| iv_qid, (int)lv_mds.msg_qnum, |
| iv_acquired_buffer_count, errno); |
| } |
| } |
| |
| // todo: why is this needed? |
| lv_ret = signal_cv(); |
| } |
| else |
| { // Invalid buffer index |
| char la_buf[256]; |
| sprintf(la_buf, "[%s], attempt to release buffer %d but index is " |
| " invalid. Buffer address=%p\n", |
| WHERE, lv_buffer_index, (void *) lp_sr_msg); |
| log_error ( MON_CLIO_RELEASE_MSG_2, SQ_LOG_ERR, la_buf ); |
| } |
| } else { |
| // not local io shared memory. must be heap memory, delete it |
| if (cv_trace) { |
| trace_where_printf(WHERE, |
| "deleting buffer, buf=%p\n", (void *)pp_msg); |
| } |
| delete pp_msg; |
| } |
| |
| if (pv_lock) { |
| lv_ret = release_lock(); |
| LIOTM_assert(lv_ret == SUCCESS); |
| } |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "EXIT, ret=%d, errno=%d(%s)\n", lv_ret, errno, strerror(errno)); |
| |
| return lv_ret; |
| } |
| |
| // send a request to the monitor that does not expect a reply |
| int Local_IO_To_Monitor::send(struct message_def *pp_msg) { |
| const char *WHERE = "Local_IO_To_Monitor::send"; |
| int lv_ret = SUCCESS; |
| SharedMsgDef *lp_sr_msg = reinterpret_cast<SharedMsgDef *>(pp_msg); |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "ENTER, Sending req type %d\n", |
| pp_msg->u.request.type); |
| |
| errno = 0; |
| if (pp_msg != &lp_sr_msg->msg) { |
| if (cv_trace) |
| trace_where_printf(WHERE, |
| "EXIT, pp_msg did not come from 'acquire_msg()'\n"); |
| errno = EINVAL; |
| return FAILURE; |
| } |
| if (!pp_msg->noreply) { |
| if (cv_trace) |
| trace_where_printf(WHERE, |
| "EXIT, cannot use send with noreply == false\n"); |
| errno = EINVAL; |
| return FAILURE; |
| } |
| lp_sr_msg->trailer.received = false; |
| lv_ret = send_ctl_msg(MC_SReady, lp_sr_msg->trailer.index ); |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "EXIT, ret=%d\n", lv_ret); |
| |
| return lv_ret; |
| } |
| |
| // send an rt signal control message to the monitor |
| int Local_IO_To_Monitor::send_ctl_msg(MonitorCtlType pv_type, int pv_index) { |
| const char *WHERE = "Local_IO_To_Monitor::send_ctl_msg"; |
| int lv_ret = SUCCESS; |
| sigval lv_value; |
| SharedMsgDef *lp_msg = NULL; |
| |
| if (cv_trace) { |
| trace_where_printf(WHERE, "ENTER\n"); |
| } |
| |
| assert( pv_index >= 0 ); |
| |
| lv_value.sival_int = ((pv_index&0xfffff) << 8) | (pv_type & 0xff); |
| if ( pv_type == MC_SReady || pv_type == MC_AttachStartup ) |
| { |
| lp_msg = ((SharedMsgDef *)(ip_cshm+sizeof(SharedMemHdr)+ |
| (pv_index*sizeof(SharedMsgDef)))); |
| lp_msg->trailer.OSPid = iv_pid; |
| lp_msg->trailer.verifier = iv_verifier; |
| } |
| |
| if (cv_trace) { |
| const char *lp_type_str = get_type_str(pv_type); |
| trace_where_printf(WHERE, |
| "sending ctl msg: type=%d(%s), val=0x%x, idx=%d, pid=%d\n", |
| pv_type, lp_type_str, lv_value.sival_int, pv_index, iv_pid); |
| } |
| |
| int err; |
| do { |
| err = errno = 0; |
| lv_ret = sigqueue(iv_mpid, SQ_LIO_SIGNAL_REQUEST_REPLY, lv_value); |
| if (lv_ret != 0) { |
| err = errno; // save initial error |
| if(errno != ESRCH && errno != EAGAIN) { |
| if (cv_trace) |
| trace_where_printf(WHERE, "sigqueue() failed, ret=%d, errno=%d(%s)\n" |
| , lv_ret, err, strerror(err)); |
| } |
| } |
| } while( errno == EAGAIN ); |
| |
| if (lv_ret == -1) { |
| int rc; |
| switch (pv_type) { |
| case MC_SReady: |
| case MC_AttachStartup: |
| // Could not signal the monitor so complete the message to the client |
| // Ignore any subsequent errors |
| if (lp_msg != NULL) |
| lp_msg->trailer.received = true; |
| rc = acquire_lock(); |
| if (rc == FAILURE) { |
| break; |
| } |
| signal_cv(); |
| LIOTM_assert(release_lock() == SUCCESS); |
| break; |
| |
| default: |
| break; |
| } |
| } |
| |
| // restore initial error |
| errno = err; |
| if (cv_trace) |
| trace_where_printf(WHERE, "EXIT, ret=%d, errno=%d(%s)\n" |
| , lv_ret, errno, strerror(errno)); |
| |
| return(lv_ret); |
| } |
| |
| // this routine sends a message to the monitor and waits for a reply |
| int Local_IO_To_Monitor::send_recv(struct message_def *pp_msg, |
| bool pv_nw) { |
| const char *WHERE = "Local_IO_To_Monitor::send_recv"; |
| int lv_ret = SUCCESS; |
| SharedMsgDef *lp_sr_msg = reinterpret_cast<SharedMsgDef *>(pp_msg); |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, |
| "Sending req type %d, idx=%d\n", |
| pp_msg->u.request.type, lp_sr_msg->trailer.index); |
| |
| if (lp_sr_msg->trailer.attaching) |
| lv_ret = send_ctl_msg(MC_AttachStartup, lp_sr_msg->trailer.index); |
| else |
| { |
| lp_sr_msg->trailer.received = false; // dg |
| lv_ret = send_ctl_msg(MC_SReady, lp_sr_msg->trailer.index); |
| } |
| |
| if ( lv_ret == SUCCESS) { |
| lv_ret = acquire_lock(); |
| |
| if ( lv_ret == SUCCESS) { |
| while (!lp_sr_msg->trailer.received && !pv_nw && !iv_shutdown) { |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "Waiting for response\n"); |
| |
| lv_ret = wait_on_cv(); |
| if ( lv_ret != SUCCESS) { |
| break; |
| } |
| else { |
| if (iv_unknown_process) { |
| if (cv_trace) |
| trace_where_printf(WHERE, |
| "got iv_unknown_process from monitor, returning NULL\n"); |
| break; |
| } |
| } |
| } |
| |
| if ( lv_ret == FAILURE || |
| iv_shutdown || |
| lp_sr_msg->trailer.received ) { |
| lp_sr_msg->trailer.received = false; |
| } |
| |
| if ( lv_ret == SUCCESS) |
| { |
| lv_ret = release_lock(); |
| LIOTM_assert(lv_ret == SUCCESS); |
| } |
| else |
| { |
| LIOTM_assert(release_lock() == SUCCESS); |
| } |
| } |
| } |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "EXIT, msg=%p, ret=%d, errno=%d(%s)\n" |
| , (void *) pp_msg, lv_ret, errno, strerror(errno)); |
| return(lv_ret); |
| } |
| |
| // sets the callback routines |
| int Local_IO_To_Monitor::set_cb(CallBack pp_cb, const char *pp_type) { |
| const char *WHERE = "Local_IO_To_Monitor::set_cb"; |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "ENTER\n"); |
| |
| if (strcasecmp(pp_type, "notice") == 0) |
| ip_notice_cb = pp_cb; |
| else if (strcasecmp(pp_type, "event") == 0) |
| ip_event_cb = pp_cb; |
| else if (strcasecmp(pp_type, "recv") == 0) |
| ip_recv_cb = pp_cb; |
| else if (strcasecmp(pp_type, "unsol") == 0) |
| ip_unsol_cb = pp_cb; |
| else { |
| if (cv_trace) |
| trace_where_printf(WHERE, "EXIT, invalid callback\n"); |
| errno = EINVAL; |
| return(FAILURE); |
| } |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "EXIT\n"); |
| |
| return(SUCCESS); |
| } |
| |
| // wakes up a thread waiting on the localio event cv |
| int Local_IO_To_Monitor::signal_event_cv() { |
| const char *WHERE = "Local_IO_To_Monitor::signal_event_cv"; |
| int lv_ret = SUCCESS; |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "ENTER\n"); |
| |
| iv_ev_signaled = true; |
| lv_ret = pthread_cond_broadcast(&iv_ev_cv); |
| if ( lv_ret != 0) { |
| errno = lv_ret; |
| lv_ret = FAILURE; |
| } |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "EXIT, ret=%d, errno=%d(%s)\n", lv_ret, errno, strerror(errno)); |
| |
| return(lv_ret); |
| } |
| |
| // wakes up a thread waiting on the localio send/recv cv |
| int Local_IO_To_Monitor::signal_cv( int err ) { |
| const char *WHERE = "Local_IO_To_Monitor::signal_cv"; |
| int lv_ret = SUCCESS; |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "ENTER\n"); |
| |
| if ( err == EINTR ) { |
| iv_sr_interrupted = true; |
| } |
| else { |
| iv_sr_signaled = true; |
| } |
| lv_ret = pthread_cond_broadcast(&iv_sr_cv); |
| if ( lv_ret != 0) { |
| errno = lv_ret; |
| lv_ret = FAILURE; |
| } |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "EXIT, ret=%d, errno=%d(%s)\n", lv_ret, errno, strerror(errno)); |
| |
| return(lv_ret); |
| } |
| |
| // end all local IO processing |
| void Local_IO_To_Monitor::shutdown( void ) { |
| const char *WHERE = "Local_IO_To_Monitor::shutdown"; |
| int lv_ret = SUCCESS; |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "ENTER\n"); |
| |
| // on error from called method, just return |
| |
| errno = 0; |
| lv_ret = gp_local_mon_io->acquire_lock(); |
| if (lv_ret == FAILURE ) { |
| if (cv_trace) |
| trace_where_printf(WHERE, "EXIT\n"); |
| return; |
| } |
| gp_local_mon_io->iv_shutdown = true; |
| lv_ret = gp_local_mon_io->signal_cv(); |
| if (lv_ret == FAILURE ) { |
| if (cv_trace) |
| trace_where_printf(WHERE, "EXIT\n"); |
| return; |
| } |
| lv_ret = gp_local_mon_io->release_lock(); |
| if (lv_ret == FAILURE ) { |
| if (cv_trace) |
| trace_where_printf(WHERE, "EXIT\n"); |
| return; |
| } |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "EXIT\n"); |
| } |
| |
| // the size of the message. used to reduce the amount of copy |
| int Local_IO_To_Monitor::size_of_msg( struct message_def *pp_msg, bool reply) { |
| const char *WHERE = "Local_IO_To_Monitor::size_of_msg"; |
| size_t lv_len; |
| // Must use the first structure in union to get correct offset |
| long lv_preamble = (long)&pp_msg->u.request.u.shutdown - (long)pp_msg; |
| |
| switch (pp_msg->type) { |
| case MsgType_Change: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.change); |
| break; |
| |
| case MsgType_Close: |
| if (reply) |
| lv_len = lv_preamble + sizeof(pp_msg->u.reply.u.close); |
| else |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.close); |
| break; |
| |
| case MsgType_Event: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.event_notice); |
| break; |
| |
| case MsgType_NodeDown: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.down); |
| break; |
| |
| case MsgType_NodePrepare: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.prepare); |
| break; |
| |
| case MsgType_NodeQuiesce: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.quiesce); |
| break; |
| |
| case MsgType_NodeUp: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.up); |
| break; |
| |
| case MsgType_Open: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.open); |
| break; |
| |
| case MsgType_ProcessCreated: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.process_created); |
| break; |
| |
| case MsgType_ProcessDeath: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.death); |
| break; |
| |
| case MsgType_Shutdown: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.shutdown); |
| break; |
| |
| case MsgType_TmSyncAbort: |
| case MsgType_TmSyncCommit: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.tm_sync_notice); |
| break; |
| |
| case MsgType_UnsolicitedMessage: |
| if (reply) |
| lv_len = lv_preamble + sizeof(pp_msg->u.reply.u.unsolicited_tm_sync); |
| else |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.unsolicited_tm_sync); |
| break; |
| |
| case MsgType_NodeJoining: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.joining); |
| break; |
| |
| case MsgType_SpareUp: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.spare_up); |
| break; |
| |
| case MsgType_ReintegrationError: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.reintegrate); |
| break; |
| |
| case MsgType_Service: |
| if (reply) { |
| |
| switch (pp_msg->u.reply.type) { |
| case ReplyType_Generic: |
| lv_len = lv_preamble + sizeof(pp_msg->u.reply.u.generic); |
| break; |
| case ReplyType_Dump: |
| lv_len = lv_preamble + sizeof(pp_msg->u.reply.u.dump); |
| break; |
| case ReplyType_Get: |
| lv_len = lv_preamble + sizeof(pp_msg->u.reply.u.get); |
| break; |
| case ReplyType_NewProcess: |
| lv_len = lv_preamble + sizeof(pp_msg->u.reply.u.new_process); |
| break; |
| case ReplyType_NodeInfo: |
| lv_len = lv_preamble + sizeof(pp_msg->u.reply.u.node_info); |
| break; |
| case ReplyType_PNodeInfo: |
| lv_len = lv_preamble + sizeof(pp_msg->u.reply.u.pnode_info); |
| break; |
| case ReplyType_ProcessInfo: |
| lv_len = lv_preamble + sizeof(pp_msg->u.reply.u.process_info); |
| break; |
| case ReplyType_Open: |
| lv_len = lv_preamble + sizeof(pp_msg->u.reply.u.open); |
| break; |
| case ReplyType_OpenInfo: |
| lv_len = lv_preamble + sizeof(pp_msg->u.reply.u.open_info); |
| break; |
| case ReplyType_Startup: |
| lv_len = lv_preamble + sizeof(pp_msg->u.reply.u.startup_info); |
| break; |
| case ReplyType_TmSeqNum: |
| lv_len = lv_preamble + sizeof(pp_msg->u.reply.u.tm_seqnum); |
| break; |
| case ReplyType_TmSync: |
| lv_len = lv_preamble + sizeof(pp_msg->u.reply.u.tm_sync); |
| break; |
| case ReplyType_TransInfo: |
| lv_len = lv_preamble + sizeof(pp_msg->u.reply.u.trans_info); |
| break; |
| case ReplyType_Mount: |
| lv_len = lv_preamble + sizeof(pp_msg->u.reply.u.mount); |
| break; |
| case ReplyType_MonStats: |
| lv_len = lv_preamble + sizeof(pp_msg->u.reply.u.mon_info); |
| break; |
| case ReplyType_ZoneInfo: |
| lv_len = lv_preamble + sizeof(pp_msg->u.reply.u.zone_info); |
| break; |
| default: |
| lv_len = sizeof(*pp_msg); |
| break; |
| } |
| } else { |
| switch (pp_msg->u.request.type) { |
| case ReqType_Close: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.close); |
| break; |
| case ReqType_Dump: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.dump); |
| break; |
| case ReqType_Event: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.event); |
| break; |
| case ReqType_Exit: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.exit); |
| break; |
| case ReqType_Get: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.get); |
| break; |
| case ReqType_Kill: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.kill); |
| break; |
| case ReqType_Mount: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.mount); |
| break; |
| case ReqType_NewProcess: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.new_process); |
| break; |
| case ReqType_NodeDown: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.down); |
| break; |
| case ReqType_NodeInfo: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.node_info); |
| break; |
| case ReqType_NodeUp: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.up); |
| break; |
| case ReqType_Notice: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.event_notice); |
| break; |
| case ReqType_Notify: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.notify); |
| break; |
| case ReqType_Open: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.open); |
| break; |
| case ReqType_OpenInfo: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.open_info); |
| break; |
| case ReqType_PNodeInfo: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.pnode_info); |
| break; |
| case ReqType_ProcessInfo: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.process_info); |
| break; |
| case ReqType_ProcessInfoCont: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.process_info_cont); |
| break; |
| case ReqType_Set: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.set); |
| break; |
| case ReqType_Shutdown: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.shutdown); |
| break; |
| case ReqType_Startup: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.startup); |
| break; |
| case ReqType_TmLeader: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.leader); |
| break; |
| case ReqType_TmReady: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.tm_ready); |
| break; |
| case ReqType_TmSeqNum: |
| lv_len = lv_preamble + sizeof(pp_msg->u.reply.u.tm_seqnum); |
| break; |
| case ReqType_TmSync: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.tm_sync); |
| break; |
| case ReqType_TransInfo: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.trans_info); |
| break; |
| case ReqType_ZoneInfo: |
| lv_len = lv_preamble + sizeof(pp_msg->u.request.u.zone_info); |
| break; |
| case ReqType_MonStats: |
| default: |
| lv_len = sizeof(*pp_msg); |
| break; |
| } |
| } |
| break; |
| default: |
| lv_len = sizeof(*pp_msg); |
| break; |
| } |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "EXIT, len=%d\n", (int) lv_len); |
| |
| return (int) lv_len; |
| } |
| |
| // trace all message trailer info |
| void Local_IO_To_Monitor::trace_print_msg(const char *pp_where, |
| SharedMsgDef *pp_msg) { |
| trace_where_printf(pp_where, |
| "shared-msg: addr=%p, msg.type=%d, rcvd=%d, att=%d, index=%d, ospid=%d, verifier=%d\n", |
| (void *) pp_msg, |
| pp_msg->msg.type, |
| pp_msg->trailer.received, |
| pp_msg->trailer.attaching, |
| pp_msg->trailer.index, |
| pp_msg->trailer.OSPid, |
| pp_msg->trailer.verifier); |
| } |
| |
| // trace routine for localio |
| void Local_IO_To_Monitor::trace_where_printf(const char *pp_where, |
| const char *pp_format, ...) { |
| va_list lv_ap; |
| if (cp_trace_cb != NULL) { |
| va_start(lv_ap, pp_format); |
| cp_trace_cb(pp_where, pp_format, lv_ap); |
| va_end(lv_ap); |
| } |
| } |
| |
| // wait for the monitor to start |
| bool Local_IO_To_Monitor::wait_for_monitor( int & status ) { |
| const char *WHERE = "Local_IO_To_Monitor::wait_for_monitor"; |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "ENTER\n"); |
| |
| struct stat statbuf; |
| bool monitorStarted = false; |
| int tries = 0; |
| status = 0; |
| |
| // loop, waiting for monitor port file |
| while ( !monitorStarted && tries < iv_port_file_tries) |
| { |
| sleep(1); |
| if (stat ( ip_port_fname, &statbuf) == -1) |
| { |
| status = errno; |
| if (errno == ENOENT) |
| { // Port file does not yet exist |
| if (cv_trace) |
| trace_where_printf(WHERE, "monitor port file %s does not yet exist\n", ip_port_fname); |
| } |
| else |
| { |
| if (cv_trace) |
| trace_where_printf(WHERE, "error doing stat on monitor port file %s: %d (%s)\n", ip_port_fname, errno, strerror(errno)); |
| } |
| ++tries; |
| } |
| else |
| { |
| monitorStarted = true; |
| } |
| } |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "EXIT monitor is up=%d, tries=%d\n", |
| monitorStarted, tries); |
| |
| return monitorStarted; |
| } |
| |
| // wait for an event |
| int Local_IO_To_Monitor::wait_for_event(struct message_def **pp_msg) { |
| const char *WHERE = "Local_IO_To_Monitor::wait_for_event"; |
| int lv_ret = SUCCESS; |
| struct message_def *lp_event = NULL; |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "ENTER\n"); |
| |
| errno = 0; |
| lv_ret = acquire_ev_lock(); |
| if ( lv_ret == SUCCESS) { |
| while (iv_event_list.empty()) |
| { |
| if (cv_trace) |
| trace_where_printf(WHERE, "waiting for event\n"); |
| |
| lv_ret = wait_on_event_cv(); |
| if ( lv_ret != SUCCESS) { |
| break; |
| } |
| } |
| if ( lv_ret == SUCCESS) { |
| lp_event = iv_event_list.front(); |
| iv_event_list.pop_front(); |
| lv_ret = release_ev_lock(); |
| if ( lv_ret != SUCCESS) { |
| lp_event = NULL; |
| lv_ret = FAILURE; |
| } |
| } |
| else |
| release_ev_lock(); |
| } |
| |
| *pp_msg = lp_event; |
| if (cv_trace) |
| trace_where_printf(WHERE, "EXIT, event=%p, ret=%d, errno=%d(%s)\n" |
| , (void *) lp_event, lv_ret, errno, strerror(errno)); |
| return(lv_ret); |
| } |
| |
| // wait on the event cv |
| int Local_IO_To_Monitor::wait_on_event_cv() { |
| const char *WHERE = "Local_IO_To_Monitor::wait_on_event_cv"; |
| int lv_ret = SUCCESS; |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "ENTER\n"); |
| |
| if (!iv_ev_signaled) { |
| lv_ret = pthread_cond_wait(&iv_ev_cv, &iv_ev_lock); |
| if ( lv_ret != 0) { |
| errno = lv_ret; |
| lv_ret = FAILURE; |
| } |
| } |
| iv_ev_signaled = false; |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "EXIT, ret=%d, errno=%d(%s)\n", lv_ret, errno, strerror(errno)); |
| |
| return(lv_ret); |
| } |
| |
| // wait on the localio cv |
| int Local_IO_To_Monitor::wait_on_cv() { |
| const char *WHERE = "Local_IO_To_Monitor::wait_on_cv"; |
| int lv_ret = SUCCESS; |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "ENTER\n"); |
| |
| if (!iv_sr_signaled) { |
| lv_ret = pthread_cond_wait(&iv_sr_cv, &iv_sr_lock); |
| if ( lv_ret != 0) { |
| errno = lv_ret; |
| lv_ret = FAILURE; |
| } |
| if ( iv_sr_interrupted ) { |
| errno = EINTR; |
| lv_ret = FAILURE; |
| } |
| iv_sr_interrupted = false; |
| } |
| iv_sr_signaled = false; |
| if (iv_shutdown && (lv_ret == SUCCESS)) { |
| errno = ESRCH; |
| lv_ret = FAILURE; |
| } |
| |
| if (cv_trace) |
| trace_where_printf(WHERE, "EXIT, ret=%d, errno=%d(%s)\n", lv_ret, errno, strerror(errno)); |
| |
| return(lv_ret); |
| } |
| |
| const char * Local_IO_To_Monitor::msgTypes_[] = { |
| "invalid", |
| "Change", |
| "Close", |
| "Event", |
| "NodeDown", |
| "NodeJoining", |
| "NodePrepare", |
| "NodeQuiesce", |
| "NodeUp", |
| "Open", |
| "ProcessCreated", |
| "ProcessDeath", |
| "Service", |
| "SpareUp", |
| "Shutdown", |
| "TmSyncAbort", |
| "TmSyncCommit", |
| "UnsolicitedMessage", |
| "ReintegrationError", |
| "invalid" |
| }; |
| |
| const char * Local_IO_To_Monitor::reqTypes_[] = { |
| "", |
| "Close", |
| "Dump", |
| "Event", |
| "Exit", |
| "Get", |
| "Kill", |
| "Mount", |
| "NewProcess", |
| "NodeDown", |
| "NodeInfo", |
| "NodeUp", |
| "Notice", |
| "Notify", |
| "Open", |
| "OpenInfo", |
| "PhysicalNodeInfo", |
| "ProcessInfo", |
| "ProcessInfoCont", |
| "Set", |
| "Shutdown", |
| "Startup", |
| "Stfsd", |
| "TmLeader", |
| "TmReady", |
| "TmSeqNum", |
| "TmSync", |
| "TransInfo", |
| "MonStats", |
| "ZoneInfo", |
| "invalid" |
| }; |
| |
| const char * Local_IO_To_Monitor::replyTypes_[] = { |
| "Generic", |
| "Dump", |
| "Get", |
| "NewProcess", |
| "NodeInfo", |
| "PhysicalNodeInfo", |
| "ProcessInfo", |
| "Open", |
| "OpenInfo", |
| "TmSeqNum", |
| "TmSync", |
| "TransInfo", |
| "Stfsd", |
| "Startup", |
| "Mount", |
| "MonStats", |
| "ZoneInfo", |
| "invalid" |
| }; |
| |
| void Local_IO_To_Monitor::scan_liobufs ( void ) { |
| |
| SharedMsgDef *shm; |
| int msgType; |
| int msgReqReplyType; |
| |
| printf ( "Scanning %d buffers on node %d\n", iv_client_buffers_max, |
| iv_nid ); |
| for (int index=0; index < iv_client_buffers_max; ++index) |
| { |
| shm = (SharedMsgDef *)(ip_cshm+sizeof(SharedMemHdr) |
| +(index*sizeof(SharedMsgDef))); |
| if ( shm->trailer.bufInUse != 0 ) |
| { // Buffer is in use |
| char timestring[50]; |
| STRCPY ( timestring, ctime ( &shm->trailer.timestamp.tv_sec )); |
| timestring[strlen(timestring)-1] = '\0'; |
| |
| msgType = shm->msg.type; |
| |
| if ( msgType == MsgType_Service ) |
| { // Service request or reply |
| msgReqReplyType = shm->msg.u.request.type; |
| |
| if ( msgReqReplyType < ReplyType_Generic ) |
| { // A request |
| if ( msgReqReplyType >= ReqType_Invalid) |
| msgReqReplyType = ReqType_Invalid; |
| |
| printf( "buffer #%d, request=%s, owner=%d, acquired %s\n", |
| index, reqTypes_[msgReqReplyType], |
| shm->trailer.bufInUse, timestring); |
| } |
| else |
| { // A reply |
| if ( msgReqReplyType >= ReplyType_Invalid) |
| msgReqReplyType = ReplyType_Invalid; |
| msgReqReplyType -= ReplyType_Generic; |
| |
| printf( "buffer #%d, reply=%s, owner=%d, acquired %s\n", |
| index, replyTypes_[msgReqReplyType], |
| shm->trailer.bufInUse, timestring); |
| } |
| } |
| else |
| { |
| if ( msgType >= MsgType_Invalid ) msgType = MsgType_Invalid; |
| |
| printf( "buffer #%d, message=%s, owner=%d, acquired %s\n", |
| index, msgTypes_[msgType], |
| shm->trailer.bufInUse, timestring); |
| } |
| } |
| } |
| } |
| |
| int Local_IO_To_Monitor::getCurrentBufferCount() |
| { |
| struct msqid_ds buf; |
| |
| if (msgctl(iv_qid, IPC_STAT, &buf) == -1) |
| { |
| return -1; |
| } |
| else |
| { |
| return (int)buf.msg_qnum; |
| } |
| } |