blob: 014e9a571b5ded0e11f1cad420904bd973be3d31 [file] [log] [blame]
///////////////////////////////////////////////////////////////////////////////
//
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
//
///////////////////////////////////////////////////////////////////////////////
#include <stdio.h>
#include <stdlib.h>
#include "clio.h"
#include "sqevlog/evl_sqlog_writer.h"
#include "montestutil.h"
#include "xmpi.h"
MonTestUtil util;
long trace_settings = 0;
FILE *shell_locio_trace_file = NULL;
bool tracing = false;
bool shutdownBeforeStartup = false;
bool shutdownSent = false;
// Server message tags
#define USER_TAG 100
// Server process commands
#define CMD_CONT 1
#define CMD_END 2
#define CMD_ABORT 3
// global variables
int MyRank = -1;
const char *MyName;
int gv_ms_su_nid = -1; // Local IO nid to make compatible w/ Seabed
SB_Verif_Type gv_ms_su_verif = -1;
char ga_ms_su_c_port[MPI_MAX_PORT_NAME] = {0}; // connect
MPI_Comm worker_comm;
// Routine for handling notices
void recv_notice_msg(struct message_def *recv_msg, int )
{
if ( recv_msg->type == MsgType_Shutdown )
{
if ( tracing )
printf("[%s] Shutdown notice received, level=%d\n",
MyName, recv_msg->u.request.u.shutdown.level);
shutdownSent = true;
}
else
{
printf("[%s] unexpected notice, type=%s\n", MyName,
MessageTypeString( recv_msg->type));
}
}
void server ()
{
int j = 0,
rc;
int recvbuf[3];
char sendbuf[100];
bool done = false;
MPI_Status status;
do
{
j++;
if ( tracing )
{
printf("[%s] waiting for message.\n",MyName);
}
rc = XMPI_Recv (recvbuf, 3, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG,
worker_comm, &status);
if (rc == MPI_SUCCESS)
{
switch (recvbuf[2])
{
case CMD_CONT:
sprintf (sendbuf, "[%s] Received (%d:%d) CMD_CONT", MyName,
recvbuf[0], recvbuf[1]);
break;
case CMD_END:
sprintf (sendbuf, "[%s] Received (%d:%d) CMD_END", MyName,
recvbuf[0], recvbuf[1]);
done = true;
break;
case CMD_ABORT:
sprintf (sendbuf, "[%s] Received (%d:%d) CMD_ABORT", MyName,
recvbuf[0], recvbuf[1]);
if ( tracing )
{
printf ("[%s] Received CMD_ABORT, exiting\n", MyName);
}
exit (1);
default:
sprintf (sendbuf, "[%s] Received (%d:%d) UNKNOWN", MyName,
recvbuf[0], recvbuf[1]);
}
if ( tracing )
{
printf("[%s] message received <%s>.\n", MyName, sendbuf);
}
rc = XMPI_Send (sendbuf, (int) strlen (sendbuf) + 1, MPI_CHAR, 0,
USER_TAG, worker_comm);
}
if (rc != MPI_SUCCESS)
{
printf ("[%s] try %d failed with rc = (%d)%s\n", MyName, j, rc,
util.MPIErrMsg(rc));
util.closeProcess ( worker_comm );
printf ("[%s] Waiting to Accept connection.\n", MyName);
XMPI_Comm_accept (util.getPort(), MPI_INFO_NULL, 0, MPI_COMM_SELF,
&worker_comm);
XMPI_Comm_set_errhandler (worker_comm, MPI_ERRORS_RETURN);
printf ("[%s] Reconnected.\n", MyName);
}
else
{
if ( tracing )
{
printf("[%s] reply sent.\n",MyName);
}
}
}
while (!done);
}
int main (int argc, char *argv[])
{
util.processArgs (argc, argv);
tracing = util.getTrace();
MyName = util.getProcName();
util.InitLocalIO( );
assert (gp_local_mon_io);
if (util.getShutdownBeforeStartup())
{
sleep(2); // wait for shutdown to be sent
if ( tracing )
{
printf ("[%s] Sending startup message\n", MyName);
}
util.requestStartup ();
gp_local_mon_io->set_cb(recv_notice_msg, "notice");
do
{
printf ("[%s] Waiting for shutdown message.\n", MyName);
fflush (stdout);
sleep(1);
}
while (!shutdownSent);
// tell monitor we are exiting
util.requestExit ( );
if ( tracing )
{
printf ("[%s] exiting.\n", MyName);
}
if (gp_local_mon_io)
{
delete gp_local_mon_io;
}
exit (0);
}
else if ( util.getNodedownBeforeStartup() )
{
if ( tracing )
{
printf ("[%s] nodedownBeforeStartup mode, going to sleep\n", MyName);
fflush (stdout);
}
sleep(8);
// tell monitor we are exiting
util.requestExit ( );
// now exit
if ( tracing )
{
printf ("[%s] exiting.\n", MyName);
}
if (gp_local_mon_io)
{
delete gp_local_mon_io;
}
exit (0);
}
else
{
if ( tracing )
{
printf ("[%s] Shutdown or Nodedown before startup not set\n", MyName);
}
}
util.requestStartup ();
if ( tracing )
{
printf ("[%s] Port: %s\n", MyName, util.getPort());
}
if ( tracing )
{
printf ("[%s] Wait to connect.\n", MyName);
}
XMPI_Comm_accept (util.getPort(), MPI_INFO_NULL, 0, MPI_COMM_SELF,
&worker_comm);
XMPI_Comm_set_errhandler (worker_comm, MPI_ERRORS_RETURN);
if ( tracing )
{
printf ("[%s] Connected.\n", MyName);
}
/*
// need to modify this so that we look for "waitforclose" or some other argument that means we should not process requests.
if (argc==7 || argc==9)
{
*/
if ( tracing )
{
printf("[%s] Ready to process client requests\n",MyName);
}
server ();
/*
}
else
{
printf("[%s] Not processing client requests\n",MyName);
sleep(10);
}
*/
if ( tracing )
{
printf ("[%s] disconnecting.\n", MyName);
}
util.closeProcess ( worker_comm );
XMPI_Close_port (util.getPort());
// need to decide if any test requires the server to exit abnormally.
// if so probably would be best to have a server argument that specifies
// that.
/*
if ((strcmp("$SERV2",MyName)==0) ||
(strcmp("$SERV3",MyName)==0) )
{
*/
// tell monitor we are exiting
util.requestExit ( );
/*
}
*/
if ( tracing )
{
printf ("[%s] exiting.\n", MyName);
}
if (gp_local_mon_io)
{
delete gp_local_mon_io;
}
exit (0);
}