| /////////////////////////////////////////////////////////////////////////////// |
| // |
| // @@@ START COPYRIGHT @@@ |
| // |
| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| // @@@ END COPYRIGHT @@@ |
| // |
| /////////////////////////////////////////////////////////////////////////////// |
| |
| #include <stdio.h> |
| #include <stdlib.h> |
| #include <string.h> |
| #include <mpi.h> |
| #include <semaphore.h> |
| #include <unistd.h> |
| #include <sys/time.h> |
| |
| #include "msgdef.h" |
| #include "props.h" |
| #include "localio.h" |
| #include "clio.h" |
| #include "sqevlog/evl_sqlog_writer.h" |
| #include "clusterconf.h" |
| |
| long trace_settings = 0; |
| |
| #define CYCLES_MAX 20 |
| #define TAKEOVER_1 5 |
| #define TAKEOVER_2 10 |
| #define TAKEOVER_3 15 |
| |
| // Server message tags |
| #define USER_TAG 100 |
| |
| // Server process commands |
| #define CMD_CONT 1 |
| #define CMD_END 2 |
| #define CMD_ABORT 3 |
| |
| char MyPort[MPI_MAX_PORT_NAME]; |
| char *MyName; |
| int MyRank = -1; |
| int MyPNid = -1; |
| int MyNid = -1; |
| int MyPid = -1; |
| int TestNum = 1; |
| bool death = false; |
| MPI_Comm Monitor = MPI_COMM_NULL; |
| MPI_Comm Server = MPI_COMM_NULL; |
| MPI_Errhandler CommHandler; |
| MPI_Request Request[3] = { MPI_REQUEST_NULL, MPI_REQUEST_NULL, MPI_REQUEST_NULL }; |
| int gv_ms_su_nid = -1; // Local IO nid to make compatible w/ Seabed |
| SB_Verif_Type gv_ms_su_verif = -1; |
| char ga_ms_su_c_port[MPI_MAX_PORT_NAME] = {0}; // connect |
| |
| //forward procedures |
| char *display_error (int err); |
| void flush_incoming_msgs( void ); |
| int sendrecv (MPI_Comm comm, void *sbuf, int ssize, MPI_Datatype stype, |
| void *rbuf, int rsize, MPI_Datatype rtype, int stag, int rtag, MPI_Status *status=NULL); |
| void wait_for_death_notice (void); |
| |
| FILE *shell_locio_trace_file = NULL; |
| |
| int mon_log_write(int pv_event_type, posix_sqlog_severity_t pv_severity, char *pp_string) |
| { |
| |
| pv_event_type = pv_event_type; |
| pv_severity = pv_severity; |
| int lv_err = 0; |
| |
| printf("%s", pp_string ); |
| |
| return lv_err; |
| } |
| |
| void shell_locio_trace(const char *where, const char *format, va_list ap) |
| { |
| if (shell_locio_trace_file != NULL) |
| { |
| int ms; |
| int us; |
| struct timeval t; |
| struct tm tx; |
| char buf[BUFSIZ]; |
| gettimeofday(&t, NULL); |
| localtime_r(&t.tv_sec, &tx); |
| ms = (int) t.tv_usec / 1000; |
| us = (int) t.tv_usec - ms * 1000; |
| |
| sprintf(buf, "%02d:%02d:%02d.%03d.%03d %s: (%lx)", |
| tx.tm_hour, tx.tm_min, tx.tm_sec, ms, us, where, |
| pthread_self()); |
| vsprintf(&buf[strlen(buf)], format, ap); |
| fprintf(shell_locio_trace_file, buf); |
| fflush(shell_locio_trace_file); |
| } |
| } |
| |
| char *ErrorMsg (int error_code) |
| { |
| int rc; |
| int length; |
| static char buffer[MPI_MAX_ERROR_STRING]; |
| |
| rc = MPI_Error_string (error_code, buffer, &length); |
| if (rc != MPI_SUCCESS) |
| { |
| sprintf(buffer,"MPI_Error_string: Invalid error code (%d)\n", error_code); |
| length = strlen(buffer); |
| } |
| buffer[length] = '\0'; |
| |
| return buffer; |
| } |
| |
| bool check_notice (struct message_def *notice, MPI_Status * status) |
| { |
| int count; |
| |
| MPI_Get_count (status, MPI_CHAR, &count); |
| if ((status->MPI_TAG == NOTICE_TAG) && |
| (count == sizeof (struct message_def)) && |
| (notice->u.request.type == ReqType_Notice)) |
| { |
| switch (notice->type) |
| { |
| case MsgType_Close: // process close notification |
| case MsgType_Open: // process open notification |
| printf ("[%s] Open/Close notices not currently supported.\n", |
| MyName); |
| break; |
| case MsgType_NodeDown: // node is down notification |
| case MsgType_NodeUp: // node is up notification |
| printf ("[%s] Node Up/Down notices not currently supported.\n", |
| MyName); |
| break; |
| case MsgType_ProcessDeath: // process death notification |
| printf ("[%s] ProcessDeath notice received from %d,%d.\n", |
| MyName, |
| notice->u.request.u.death.nid, |
| notice->u.request.u.death.pid); |
| death = true; |
| break; |
| case MsgType_Service: // request a service from the monitor |
| default: |
| printf ("[%s] Invalid notice message type received.\n", |
| MyName); |
| } |
| } |
| else |
| { |
| printf ("[%s] Notice size/type mismatch.\n", MyName); |
| } |
| fflush (stdout); |
| |
| return death; |
| } |
| |
| void CommRecovery (MPI_Comm * comm, int *err, ...) |
| { |
| int rc; |
| int mpi_err = *err & 0x000000FF; |
| |
| if (*comm == Server) |
| { |
| printf ("[%s] Server Communicator failed, err=%s\n", MyName, display_error(*err)); |
| fflush (stdout); |
| if( Request[0] != MPI_REQUEST_NULL ) |
| { |
| printf ("[%s] Canceling Irecv notice on monitor\n", MyName ); |
| rc = MPI_Cancel (&Request[0]); |
| if( rc != MPI_SUCCESS ) |
| { |
| printf ("[%s] Irecv Cancel on monitor failed, rc = %s\n", MyName, display_error(rc) ); |
| } |
| if( Request[0] != MPI_REQUEST_NULL ) |
| { |
| MPI_Request_free(&Request[0]); |
| } |
| Request[0] = MPI_REQUEST_NULL; |
| } |
| if( Request[1] != MPI_REQUEST_NULL ) |
| { |
| printf ("[%s] Canceling Isend on comm\n", MyName ); |
| rc = MPI_Cancel (&Request[1]); |
| if( rc != MPI_SUCCESS ) |
| { |
| printf ("[%s] Isend Cancel on comm failed, rc = %s\n", MyName, display_error(rc) ); |
| } |
| if( Request[1] != MPI_REQUEST_NULL ) |
| { |
| MPI_Request_free(&Request[1]); |
| } |
| Request[1] = MPI_REQUEST_NULL; |
| } |
| if( Request[2] != MPI_REQUEST_NULL ) |
| { |
| printf ("[%s] Canceling Irecv on comm\n", MyName ); |
| rc = MPI_Cancel (&Request[2]); |
| if( rc != MPI_SUCCESS ) |
| { |
| printf ("[%s] Irecv Cancel on comm failed, rc = %s\n", MyName, display_error(rc) ); |
| } |
| if( Request[2] != MPI_REQUEST_NULL ) |
| { |
| MPI_Request_free(&Request[2]); |
| } |
| Request[2] = MPI_REQUEST_NULL; |
| } |
| MPI_Comm_disconnect(&Server); |
| Server = MPI_COMM_NULL; |
| } |
| else if (*comm == MPI_COMM_WORLD) |
| { |
| printf ("[%s] MPI_COMM_WORLD Communicator failed, err=%s, aborting.\n", MyName, display_error(*err)); |
| fflush (stdout); |
| // exit (mpi_err); |
| } |
| else |
| { |
| printf ("[%s] Unknown Communicator failed, err=%s, aborting.\n", MyName, display_error(*err)); |
| fflush (stdout); |
| MPI_Abort (MPI_COMM_WORLD,mpi_err); |
| } |
| } |
| |
| char *display_error (int err) |
| { |
| int len = 0; |
| int mpi_err; |
| static char errbuf[MPI_MAX_ERROR_STRING + 6]; |
| |
| mpi_err = err & 0x000000FF; |
| sprintf (errbuf, "(%2.2d) ", mpi_err); |
| MPI_Error_string (err, &errbuf[5], &len); |
| errbuf[len + 5] = '\0'; |
| |
| return errbuf; |
| } |
| |
| void close_server (char *process_name, MPI_Comm * comm) |
| { |
| int count; |
| MPI_Status status; |
| struct message_def *msg = new struct message_def; |
| |
| if (comm) {} // Avoid "unused parameter warning |
| |
| printf ("[%s] closing server %s.\n", MyName, process_name); |
| fflush (stdout); |
| |
| if( *comm != MPI_COMM_NULL ) |
| { |
| #ifdef NO_OPEN_CLOSE_NOTICES |
| // Send close notice |
| msg->type = MsgType_Close; |
| msg->noreply = false; |
| msg->reply_tag = CLOSE_TAG; |
| msg->u.request.type = ReqType_Notice; |
| msg->u.request.u.close.nid = MyNid; |
| msg->u.request.u.close.pid = MyPid; |
| strcpy (msg->u.request.u.close.process_name, MyName); |
| msg->u.request.u.close.aborted = false; |
| msg->u.request.u.close.mon = true; |
| MPI_Send(msg, sizeof(struct message_def), MPI_CHAR, 0, NOTICE_TAG, *comm); |
| #endif |
| |
| // Close communicator |
| MPI_Comm_disconnect( comm ); |
| *comm = MPI_COMM_NULL; |
| } |
| |
| msg->type = MsgType_Service; |
| msg->noreply = false; |
| msg->reply_tag = REPLY_TAG; |
| msg->u.request.type = ReqType_Close; |
| msg->u.request.u.close.nid = MyNid; |
| msg->u.request.u.close.pid = MyPid; |
| strcpy (msg->u.request.u.close.process_name, process_name); |
| sendrecv (Monitor, msg, sizeof (struct message_def), MPI_CHAR, |
| msg, sizeof (struct message_def), MPI_CHAR, SERVICE_TAG, REPLY_TAG, &status); |
| MPI_Get_count (&status, MPI_CHAR, &count); |
| if ((status.MPI_TAG == REPLY_TAG) && |
| (count == sizeof (struct message_def))) |
| { |
| if ((msg->type == MsgType_Service) && |
| (msg->u.reply.type == ReplyType_Generic)) |
| { |
| if (msg->u.reply.u.generic.return_code == MPI_SUCCESS) |
| { |
| printf |
| ("[%s] closed process successfully. Nid=%d, Pid=%d, rtn=%d\n", |
| MyName, msg->u.reply.u.generic.nid, |
| msg->u.reply.u.generic.pid, |
| msg->u.reply.u.generic.return_code); |
| } |
| else |
| { |
| printf ("[%s] close process failed, rc=%s\n", MyName, |
| display_error(msg->u.reply.u.generic.return_code)); |
| } |
| } |
| else |
| { |
| printf |
| ("[%s] Invalid MsgType(%d)/ReplyType(%d) for close message\n", |
| MyName, msg->type, msg->u.reply.type); |
| } |
| } |
| else |
| { |
| printf ("[%s] close process reply message invalid\n", MyName); |
| } |
| fflush (stdout); |
| delete msg; |
| } |
| |
| void exit_process (void) |
| { |
| int count; |
| MPI_Status status; |
| struct message_def *msg = new struct message_def; |
| |
| printf ("[%s] sending exit process message.\n", MyName); |
| fflush (stdout); |
| msg->type = MsgType_Service; |
| msg->noreply = false; |
| msg->reply_tag = REPLY_TAG; |
| msg->u.request.type = ReqType_Exit; |
| msg->u.request.u.exit.nid = MyNid; |
| msg->u.request.u.exit.pid = MyPid; |
| sendrecv (Monitor, msg, sizeof (struct message_def), MPI_CHAR, |
| msg, sizeof (struct message_def), MPI_CHAR, SERVICE_TAG, REPLY_TAG, &status); |
| MPI_Get_count (&status, MPI_CHAR, &count); |
| if ((status.MPI_TAG == REPLY_TAG) && |
| (count == sizeof (struct message_def))) |
| { |
| if ((msg->type == MsgType_Service) && |
| (msg->u.reply.type == ReplyType_Generic)) |
| { |
| if (msg->u.reply.u.generic.return_code == MPI_SUCCESS) |
| { |
| printf ("[%s] exited process successfully.\n", MyName); |
| } |
| else |
| { |
| printf ("[%s] exit process failed, rc=%s\n", MyName, |
| display_error(msg->u.reply.u.generic.return_code)); |
| } |
| flush_incoming_msgs (); |
| } |
| else |
| { |
| printf |
| ("[%s] Invalid MsgType(%d)/ReplyType(%d) for Exit message\n", |
| MyName, msg->type, msg->u.reply.type); |
| } |
| } |
| else |
| { |
| printf ("[%s] exit process reply invalid.\n", MyName); |
| } |
| fflush (stdout); |
| delete msg; |
| } |
| |
| void flush_incoming_msgs( void ) |
| { |
| int count; |
| int complete = 0; |
| bool done = false; |
| MPI_Status status; |
| struct message_def *msg = NULL; |
| |
| printf ("[%s] flush incoming event & notices.\n", MyName); |
| fflush (stdout); |
| do |
| { |
| gp_local_mon_io->get_notice( &msg ); |
| if (msg) |
| { |
| printf("[%s] Got local IO notice\n",MyName); |
| complete = true; |
| count = sizeof( *msg ); |
| status.MPI_TAG = msg->reply_tag; |
| } |
| else |
| { |
| printf("[%s] No local IO notice\n",MyName); |
| complete = false; |
| done = true; |
| } |
| |
| if (complete) |
| { |
| MPI_Get_count (&status, MPI_CHAR, &count); |
| if (((status.MPI_TAG == NOTICE_TAG) || |
| (status.MPI_TAG == EVENT_TAG ) ) && |
| (count == sizeof (struct message_def)) ) |
| { |
| if (msg->u.request.type == ReqType_Notice) |
| { |
| switch (msg->type) |
| { |
| case MsgType_ProcessDeath: |
| if ( msg->u.request.u.death.aborted ) |
| { |
| printf ("[%s] Process %s abnormally terminated. Nid=%d, Pid=%d\n", |
| MyName, msg->u.request.u.death.process_name, |
| msg->u.request.u.death.nid, msg->u.request.u.death.pid); |
| } |
| else |
| { |
| printf ("[%s] Process %s terminated normally. Nid=%d, Pid=%d\n", |
| MyName, msg->u.request.u.death.process_name, |
| msg->u.request.u.death.nid, msg->u.request.u.death.pid); |
| } |
| break; |
| |
| case MsgType_NodeDown: |
| printf ("[%s] Node %d (%s) is DOWN\n", |
| MyName, msg->u.request.u.down.nid, msg->u.request.u.down.node_name ); |
| break; |
| |
| case MsgType_NodeUp: |
| printf ("[%s] Node %d (%s) is UP\n", |
| MyName, msg->u.request.u.up.nid, msg->u.request.u.up.node_name); |
| break; |
| |
| case MsgType_Change: |
| printf ("[%s] Configuration Change Notice for Group: %s Key: %s\n", |
| MyName, |
| msg->u.request.u.change.group, |
| msg->u.request.u.change.key); |
| break; |
| case MsgType_Open: |
| case MsgType_Close: |
| printf ("[%s] Open/Close process notification\n", MyName); |
| break; |
| |
| case MsgType_Event: |
| printf("[%s] Event %d received\n", |
| MyName, msg->u.request.u.event_notice.event_id); |
| break; |
| |
| case MsgType_Shutdown: |
| printf("[%s] Shutdown notice, level=%d received\n", |
| MyName, msg->u.request.u.shutdown.level); |
| break; |
| |
| default: |
| printf("[%s] Invalid Notice Type(%d) for flush message\n", |
| MyName, msg->type); |
| } |
| } |
| else |
| { |
| printf("[%s] Invalid Event received - msgtype=%d, noreply=%d, reqtype=%d\n", |
| MyName, msg->type, msg->noreply, msg->u.request.type); |
| } |
| } |
| else |
| { |
| printf ("[%s] Failed to flush messages\n", MyName); |
| done = true; |
| } |
| fflush (stdout); |
| } |
| if (msg) delete msg; |
| } |
| while (!done); |
| } |
| |
| int open_server (char *process_name, MPI_Comm * comm) |
| { |
| int rc; |
| int count; |
| MPI_Status status; |
| struct message_def *msg = new struct message_def; |
| |
| printf ("[%s] opening server %s.\n", MyName, process_name); |
| fflush (stdout); |
| msg->type = MsgType_Service; |
| msg->noreply = false; |
| msg->reply_tag = REPLY_TAG; |
| msg->u.request.type = ReqType_Open; |
| msg->u.request.u.open.nid = MyNid; |
| msg->u.request.u.open.pid = MyPid; |
| strcpy (msg->u.request.u.open.target_process_name, process_name); |
| msg->u.request.u.open.death_notification = 1; |
| sendrecv (Monitor, msg, sizeof (struct message_def), MPI_CHAR, |
| msg, sizeof (struct message_def), MPI_CHAR, SERVICE_TAG, REPLY_TAG, &status); |
| MPI_Get_count (&status, MPI_CHAR, &count); |
| if ((status.MPI_TAG == REPLY_TAG) && |
| (count == sizeof (struct message_def))) |
| { |
| if ((msg->type == MsgType_Service) && |
| (msg->u.reply.type == ReplyType_Open)) |
| { |
| if (msg->u.reply.u.open.return_code == MPI_SUCCESS) |
| { |
| printf ("[%s] opened process successfully. Nid=%d, Pid=%d, Port=%s\n", |
| MyName, |
| msg->u.reply.u.open.nid, |
| msg->u.reply.u.open.pid, |
| msg->u.reply.u.open.port); |
| rc = MPI_Comm_connect (msg->u.reply.u.open.port, MPI_INFO_NULL, |
| 0, MPI_COMM_SELF, comm); |
| if (rc == MPI_SUCCESS) |
| { |
| MPI_Comm_set_errhandler (*comm, CommHandler); |
| #ifdef NO_OPEN_CLOSE_NOTICES |
| // Send open notice to server |
| msg->type = MsgType_Open; |
| msg->noreply = true; |
| msg->u.request.type = ReqType_Notice; |
| msg->u.request.u.open.nid = MyNid; |
| msg->u.request.u.open.pid = MyPid; |
| strcpy (msg->u.request.u.open.target_process_name, MyName); |
| MPI_Send(msg,sizeof(struct message_def),MPI_CHAR,0,NOTICE_TAG,*comm); |
| #endif |
| printf ("[%s] connected to process.\n", MyName); |
| fflush (stdout); |
| } |
| else |
| { |
| printf ("[%s] failed to connected. rc = %s\n", |
| MyName, |
| display_error(rc) ); |
| } |
| } |
| else |
| { |
| rc = msg->u.reply.u.open.return_code; |
| printf ("[%s] open process failed, rc = %s\n", |
| MyName, |
| display_error(rc)); |
| } |
| } |
| else |
| { |
| rc = MPI_ERR_UNKNOWN; |
| printf |
| ("[%s] Invalid MsgType(%d)/ReplyType(%d) for open message\n", |
| MyName, msg->type, msg->u.reply.type); |
| } |
| } |
| else |
| { |
| rc = MPI_ERR_UNKNOWN; |
| printf ("[%s] open process reply message invalid\n", MyName); |
| } |
| fflush (stdout); |
| delete msg; |
| |
| return( rc ); |
| } |
| |
| int open_server_retry(char *process_name, MPI_Comm * comm) |
| { |
| int rc; |
| int openTries = 0; |
| |
| do |
| { |
| rc = open_server( process_name, comm ); |
| if ( rc == MPI_SUCCESS ) |
| { |
| break; |
| } |
| sleep(1); |
| openTries++; |
| } |
| while( rc == MPI_ERR_EXITED && openTries < 10 ); |
| if (rc != MPI_SUCCESS) |
| { |
| printf ("[%s] Open server tries exceeded, aborting\n", MyName); |
| MPI_Abort(MPI_COMM_WORLD,99); |
| } |
| |
| return( rc ); |
| } |
| |
| void process_startup (int argc, char *argv[]) |
| { |
| int i; |
| struct message_def *msg; |
| |
| printf ("[%s] processing startup.\n", argv[5]); |
| fflush (stdout); |
| |
| printf ("[%s] - argc=%d", argv[5], argc); |
| for(i=0; i<argc; i++) |
| { |
| printf (", argv[%d]=%s",i,argv[i]); |
| } |
| printf ("\n"); |
| fflush(stdout); |
| |
| strcpy (MyName, argv[5]); |
| MPI_Open_port (MPI_INFO_NULL, MyPort); |
| |
| #ifdef OFED_MUTEX |
| // free monitor.sem semaphore |
| printf ("[%s] Opening mutex\n",MyName); |
| fflush(stdout); |
| char sem_name[MAX_PROCESS_PATH]; |
| sprintf(sem_name,"/monitor.sem2.%s",getenv("USER")); |
| sem_t *mutex = sem_open(sem_name,0,0644,0); |
| if(mutex == SEM_FAILED) |
| { |
| printf ("[%s] Can't access %s semaphore\n", MyName, sem_name); |
| sem_close(mutex); |
| abort(); |
| } |
| printf ("[%s] Putting mutex\n",MyName); |
| fflush(stdout); |
| sem_post(mutex); |
| sem_close(mutex); |
| #endif |
| MyNid = atoi(argv[3]); |
| MyPid = atoi(argv[4]); |
| gv_ms_su_verif = atoi(argv[9]); |
| printf ("[%s] process_startup, MyNid: %d, lio: %p\n", |
| MyName, MyNid, (void *)gp_local_mon_io ); |
| |
| gp_local_mon_io->iv_pid = MyPid; |
| gp_local_mon_io->init_comm(); |
| |
| if (argc < 10) |
| { |
| printf |
| ("Error: Invalid startup arguments, argc=%d, argv[0]=%s, argv[1]=%s, argv[2]=%s, argv[3]=%s, argv[4]=%s, argv[5]=%s, argv[6]=%s, argv[7]=%s, argv[8]=%s, argv[9]=%s\n", |
| argc, argv[0], argv[1], argv[2], argv[3], argv[4], argv[5], argv[6], argv[7], argv[8], argv[9]); |
| exit (1); |
| } |
| else |
| { |
| gp_local_mon_io->acquire_msg( &msg ); |
| |
| msg->type = MsgType_Service; |
| msg->noreply = true; |
| msg->u.request.type = ReqType_Startup; |
| msg->u.request.u.startup.nid = MyNid; |
| msg->u.request.u.startup.pid = MyPid; |
| msg->u.request.u.startup.paired = false; |
| strcpy (msg->u.request.u.startup.process_name, argv[5]); |
| strcpy (msg->u.request.u.startup.port_name, MyPort); |
| msg->u.request.u.startup.os_pid = getpid (); |
| msg->u.request.u.startup.event_messages = true; |
| msg->u.request.u.startup.system_messages = true; |
| msg->u.request.u.startup.verifier = true; |
| msg->u.request.u.startup.startup_size = sizeof(msg->u.request.u.startup); |
| printf ("[%s] sending startup reply to monitor.\n", argv[5]); |
| fflush (stdout); |
| |
| gp_local_mon_io->send( msg ); |
| |
| printf ("[%s] Startup completed", argv[5]); |
| if (argc > 9) |
| { |
| for (i = 10; i < argc; i++) |
| { |
| printf (", argv[%d]=%s", i, argv[i]); |
| switch( i ) |
| { |
| case 10: |
| TestNum = atoi(argv[i]); |
| printf ("(%d)", TestNum); |
| break; |
| default: // ignore |
| break; |
| } |
| } |
| } |
| printf ("\n"); |
| fflush (stdout); |
| } |
| } |
| |
| void recv_localio_msg(struct message_def *recv_msg, int size) |
| { |
| MPI_Status status; |
| |
| size = size; // Avoid "unused parameter" warning |
| |
| printf("[%s] Message received: ",MyName); |
| switch ( recv_msg->type ) |
| { |
| case MsgType_Service: |
| printf("Service Reply: Type=%d, ReplyType=%d\n",recv_msg->type, recv_msg->u.reply.type); |
| break; |
| |
| case MsgType_Event: |
| printf("Event - %d\n",recv_msg->u.request.u.event_notice.event_id); |
| break; |
| |
| case MsgType_UnsolicitedMessage: |
| printf("Unsolicited Message Received:\n"); |
| break; |
| |
| default: |
| printf("Notice: Type=%d, RequestType=%d\n",recv_msg->type, recv_msg->u.request.type); |
| status.count = sizeof (*recv_msg); |
| status.MPI_TAG = NOTICE_TAG; |
| |
| check_notice( recv_msg, &status ); |
| } |
| } |
| |
| int sendrecv (MPI_Comm comm, void *sbuf, int ssize, MPI_Datatype stype, |
| void *rbuf, int rsize, MPI_Datatype rtype, int stag, int rtag, MPI_Status *status) |
| { |
| int rc; |
| int idx; |
| bool done = false; |
| MPI_Status local_status; |
| struct message_def *msg = NULL; |
| |
| death = false; |
| if (status == NULL) |
| { |
| status = &local_status; |
| } |
| |
| if ( gp_local_mon_io && !comm ) |
| { |
| printf ("[%s] sendrecv on monitor called.\n",MyName); |
| gp_local_mon_io->acquire_msg( &msg ); |
| memmove( msg, sbuf, gp_local_mon_io->size_of_msg( (struct message_def *)sbuf ) ); |
| gp_local_mon_io->send_recv( msg ); |
| if (msg) |
| { |
| memmove( rbuf, msg, gp_local_mon_io->size_of_msg( msg ) ); |
| status->count = sizeof( struct message_def ); |
| status->MPI_TAG = msg->reply_tag; |
| rc = MPI_SUCCESS; |
| } |
| else |
| { |
| rc = MPI_ERR_REQUEST; |
| } |
| gp_local_mon_io->release_msg(msg); |
| } |
| else |
| { |
| if (Request[1] == MPI_REQUEST_NULL) |
| { |
| printf ("[%s] MPI_Isend on comm called.\n", MyName); |
| rc = MPI_Isend (sbuf, ssize, stype, 0, stag, comm, &Request[1]); |
| if (rc != MPI_SUCCESS) |
| { |
| printf ("[%s] Monitor MPI_Isend failure, rc=%s\n", |
| MyName, display_error (rc)); |
| done = true; |
| } |
| } |
| else |
| { |
| printf ("[%s] MPI_Isend overrun.\n", MyName); |
| } |
| |
| do |
| { |
| if (comm != MPI_COMM_NULL) |
| { |
| if (Request[2] == MPI_REQUEST_NULL) |
| { |
| printf ("[%s] MPI_Irecv on comm called.\n", MyName); |
| rc = MPI_Irecv (rbuf, rsize, rtype, 0, rtag, comm, &Request[2]); |
| if (rc != MPI_SUCCESS) |
| { |
| printf ("[%s] Client MPI_Irecv failure, rc=%s\n", |
| MyName, display_error (rc)); |
| } |
| } |
| } |
| fflush(stdout); |
| idx = MPI_UNDEFINED; |
| rc = MPI_Waitany (3, Request, &idx, status); |
| if (rc == MPI_SUCCESS) |
| { |
| switch(idx) |
| { |
| case 1: |
| printf("[%s] Isend to comm completed\n",MyName); |
| done = true; |
| break; |
| case 2: |
| printf("[%s] Irecv from comm completed\n",MyName); |
| break; |
| default: |
| printf("[%s] Undefined completion index\n",MyName); |
| } |
| } |
| else |
| { |
| printf ("[%s] MPI_Waitany failed, rc=%s\n", MyName, display_error (rc)); |
| break; |
| } |
| fflush(stdout); |
| } |
| while ( !((idx == 2 && done) || death || idx == MPI_UNDEFINED) ); |
| |
| if (idx == 0 || idx == MPI_UNDEFINED) |
| { |
| if( Request[1] != MPI_REQUEST_NULL ) |
| { |
| printf ("[%s] Canceling Isend on comm\n", MyName ); |
| rc = MPI_Cancel (&Request[1]); |
| if( rc != MPI_SUCCESS ) |
| { |
| printf ("[%s] Isend Cancel failed, rc = %s\n", MyName, display_error(rc) ); |
| } |
| Request[1] = MPI_REQUEST_NULL; |
| } |
| if( Request[2] != MPI_REQUEST_NULL ) |
| { |
| printf ("[%s] Canceling Irecv on comm\n", MyName ); |
| rc = MPI_Cancel (&Request[2]); |
| if( rc != MPI_SUCCESS ) |
| { |
| printf ("[%s] Irecv Cancel failed, rc = %s\n", MyName, display_error(rc) ); |
| } |
| Request[2] = MPI_REQUEST_NULL; |
| } |
| |
| rc = MPI_ERR_EXITED; |
| } |
| } |
| fflush(stdout); |
| |
| return rc; |
| } |
| |
| void test1(void) |
| { |
| int i; |
| int rc; |
| int tries = 0; |
| int sendbuf[3]; |
| char recvbuf[100]; |
| |
| for (i = 0; i < CYCLES_MAX + 1; i++) |
| { |
| sendbuf[0] = MyRank; |
| sendbuf[1] = i; |
| switch (i) |
| { |
| case TAKEOVER_1: // abort the primary |
| case TAKEOVER_2: // aborts the original backup after takeover due to TAKEOVER_1 |
| case TAKEOVER_3: // aborts the primary after takeover due to TAKEOVER_2 |
| sendbuf[2] = CMD_ABORT; |
| break; |
| case CYCLES_MAX: |
| sendbuf[2] = CMD_END; |
| break; |
| default: |
| sendbuf[2] = CMD_CONT; |
| } |
| rc = sendrecv (Server, sendbuf, 3, MPI_INT, recvbuf, 100, MPI_CHAR, USER_TAG, USER_TAG); |
| if (rc == MPI_SUCCESS) |
| { |
| printf ("[%s] Cycle # %d - %s\n", MyName, i, recvbuf); |
| fflush (stdout); |
| } |
| else |
| { |
| printf ("[%s] Cycle # %d - failed, rc=%s\n", MyName, i, display_error (rc)); |
| printf ("[%s] Re-Connecting to server\n", MyName); |
| fflush (stdout); |
| tries++; |
| if ( tries > 3 ) |
| { |
| printf ("[%s] Tries exceeded, aborting\n", MyName); |
| MPI_Abort(MPI_COMM_WORLD,99); |
| } |
| if (!death) |
| { |
| wait_for_death_notice(); |
| } |
| close_server((char *) "$SERV0", &Server); |
| sleep(1); |
| open_server_retry((char *) "$SERV0", &Server); |
| i--; |
| } |
| } |
| } |
| |
| void test2(void) |
| { |
| int i = 0; |
| int rc; |
| int sendbuf[3]; |
| char recvbuf[100]; |
| |
| do |
| { |
| sendbuf[0] = MyRank; |
| sendbuf[1] = 1; |
| sendbuf[2] = CMD_CONT; |
| rc = sendrecv (Server, sendbuf, 3, MPI_INT, recvbuf, 100, MPI_CHAR, USER_TAG, USER_TAG); |
| if (rc == MPI_SUCCESS) |
| { |
| printf ("[%s] Send # 1 - %s\n", MyName, recvbuf); |
| fflush (stdout); |
| } |
| else |
| { |
| i++; |
| printf ("[%s] Cycle # %d - failed, rc=%s\n", MyName, i, display_error (rc)); |
| printf ("[%s] Re-Connecting to server\n", MyName); |
| fflush (stdout); |
| if (!death) |
| { |
| wait_for_death_notice(); |
| } |
| close_server((char *) "$SERV0", &Server); |
| sleep(1); |
| open_server_retry((char *) "$SERV0", &Server); |
| } |
| } while ( rc != MPI_SUCCESS && i < 30 ); |
| |
| sendbuf[0] = MyRank; |
| sendbuf[1] = 2; |
| sendbuf[2] = CMD_CONT; |
| rc = sendrecv (Server, sendbuf, 3, MPI_INT, recvbuf, 100, MPI_CHAR, USER_TAG, USER_TAG); |
| if (rc == MPI_SUCCESS) |
| { |
| printf ("[%s] Send # 2 - %s\n", MyName, recvbuf); |
| fflush (stdout); |
| } |
| else |
| { |
| printf ("[%s] FAILED - final send to server, rc=%s\n", MyName, display_error (rc)); |
| printf ("[%s] Aborting\n", MyName); |
| fflush (stdout); |
| MPI_Abort(MPI_COMM_WORLD,1); |
| } |
| |
| } |
| |
| void wait_for_death_notice (void) |
| { |
| struct message_def *notice = new message_def; |
| |
| printf ("[%s] waiting for death Notice->\n", MyName); |
| fflush (stdout); |
| while (!death) |
| { |
| // just wait for call back to receive notice. |
| usleep(1000); |
| |
| } |
| fflush (stdout); |
| delete notice; |
| } |
| |
| void InitLocalIO( void ) |
| { |
| char *cmd_buffer; |
| |
| if ( MyPNid == -1 ) |
| { |
| CClusterConfig ClusterConfig; // 'cluster.conf' objects |
| CPNodeConfig *pnodeConfig; |
| CLNodeConfig *lnodeConfig; |
| |
| if ( ClusterConfig.Initialize() ) |
| { |
| if ( ! ClusterConfig.LoadConfig() ) |
| { |
| printf("[%s], Failed to load cluster configuration.\n", MyName); |
| abort(); |
| } |
| } |
| else |
| { |
| printf( "[%s] Warning: No cluster.conf found\n",MyName); |
| if (MyNid == -1) |
| { |
| printf( "[%s] Warning: set default virtual node ID = 0\n",MyName); |
| MyNid = 0; |
| } |
| abort(); |
| } |
| |
| lnodeConfig = ClusterConfig.GetLNodeConfig( MyNid ); |
| pnodeConfig = lnodeConfig->GetPNodeConfig(); |
| gv_ms_su_nid = MyPNid = pnodeConfig->GetPNid(); |
| printf ("[%s] Local IO pnid = %d\n", MyName, MyPNid); |
| } |
| |
| gp_local_mon_io = new Local_IO_To_Monitor( -1 ); |
| cmd_buffer = getenv("SQ_LOCAL_IO_SHELL_TRACE"); |
| if (cmd_buffer && *cmd_buffer == '1') |
| { |
| gp_local_mon_io->cv_trace = true; |
| |
| char tracefile[MAX_SEARCH_PATH]; |
| char *tmpDir; |
| |
| tmpDir = getenv( "MPI_TMPDIR" ); |
| if (tmpDir) |
| { |
| sprintf( tracefile, "%s/shell.trace.%d", tmpDir, getpid() ); |
| } |
| else |
| { |
| sprintf( tracefile, "./shell.trace.%d", getpid() ); |
| } |
| |
| shell_locio_trace_file = fopen(tracefile, "w+"); |
| gp_local_mon_io->cp_trace_cb = shell_locio_trace; |
| } |
| } |
| |
| int main (int argc, char *argv[]) |
| { |
| // Setup HP_MPI software license |
| int key = 413675219; //413675218 to display banner |
| MPI_Initialized(&key); |
| |
| MPI_Init (&argc, &argv); |
| MPI_Comm_rank (MPI_COMM_WORLD, &MyRank); |
| MyName = new char [MAX_PROCESS_PATH]; |
| strcpy( MyName, argv[5] ); |
| MyNid = atoi(argv[3]); |
| |
| if ( ! gp_local_mon_io ) |
| { |
| InitLocalIO(); |
| assert (gp_local_mon_io); |
| } |
| |
| gp_local_mon_io->set_cb(recv_localio_msg, "notice"); |
| gp_local_mon_io->set_cb(recv_localio_msg, "recv"); |
| |
| process_startup (argc, argv); |
| |
| MPI_Errhandler_create (CommRecovery, &CommHandler); |
| MPI_Comm_set_errhandler (MPI_COMM_WORLD, CommHandler); |
| |
| open_server ((char *) "$SERV0", &Server); |
| if ( Server == MPI_COMM_NULL ) |
| { |
| printf ("[%s] Can't connect to server\n", MyName); |
| exit (1); |
| } |
| |
| // do the work |
| printf ("[%s] Starting work with server\n", MyName); |
| fflush (stdout); |
| |
| switch ( TestNum ) |
| { |
| case 1: |
| test1(); |
| break; |
| case 2: |
| test2(); |
| break; |
| default: |
| printf ("[%s] FAILED - Invalid test number (%d)\n", MyName, TestNum); |
| printf ("[%s] Aborting\n", MyName); |
| fflush (stdout); |
| MPI_Abort(MPI_COMM_WORLD,1); |
| break; |
| } |
| // close the server processes |
| close_server ((char *) "$SERV0", &Server); |
| //wait_for_death_notice(); |
| |
| // exit my process |
| exit_process (); |
| |
| if ( gp_local_mon_io ) |
| { |
| delete gp_local_mon_io; |
| } |
| |
| printf ("[%s] calling Finalize!\n", MyName); |
| fflush (stdout); |
| MPI_Close_port(MyPort); |
| MPI_Comm_set_errhandler( MPI_COMM_WORLD, MPI_ERRORS_RETURN ); |
| MPI_Errhandler_free( &CommHandler ); |
| MPI_Finalize (); |
| |
| exit (0); |
| } |