blob: 463b28b79a7e45d10c248578f2c5cbe50d2619d6 [file] [log] [blame]
///////////////////////////////////////////////////////////////////////////////
//
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
//
///////////////////////////////////////////////////////////////////////////////
#include <stdio.h>
#include <stdlib.h>
#include "clio.h"
#include "sqevlog/evl_sqlog_writer.h"
#include "montestutil.h"
#include "xmpi.h"
MonTestUtil util;
long trace_settings = 0;
FILE *shell_locio_trace_file = NULL;
bool tracing = false;
// Server message tags
#define USER_TAG 100
// Server process commands
#define CMD_CONT 1
#define CMD_END 2
#define CMD_ABORT 3
const char *MyName;
int MyRank = -1;
int gv_ms_su_nid = -1; // Local IO nid to make compatible w/ Seabed
SB_Verif_Type gv_ms_su_verif = -1;
char ga_ms_su_c_port[MPI_MAX_PORT_NAME] = {0}; // connect
const int MAX_WORKERS = 4;
int deathNoticeCount = 0;
int workerNid[MAX_WORKERS];
int workerPid[MAX_WORKERS];
int workerVerifier[MAX_WORKERS];
bool workerDeathNotice[] = { false, false, false, false };
bool testSuccess = true;
void connectServer (char *port, MPI_Comm * comm)
{
int rc;
rc = XMPI_Comm_connect (port, MPI_INFO_NULL, 0, MPI_COMM_SELF, comm);
if (rc == MPI_SUCCESS)
{
XMPI_Comm_set_errhandler (*comm, MPI_ERRORS_RETURN);
}
else
{
printf ("[%s] failed to connect to %s. rc = (%d)\n", MyName, port, rc);
abort();
}
}
// Routine for handling notices
void recv_notice_msg(struct message_def *recv_msg, int )
{
if ( recv_msg->type == MsgType_ProcessDeath )
{
if ( tracing )
{
printf("[%s] Process death notice received for %s (%d, %d:%d)\n",
MyName,
recv_msg->u.request.u.death.process_name,
recv_msg->u.request.u.death.nid,
recv_msg->u.request.u.death.pid,
recv_msg->u.request.u.death.verifier);
}
bool found = false;
for (int i=0; i<MAX_WORKERS; i++)
{
if (workerNid[i] == recv_msg->u.request.u.death.nid
&& workerPid[i] == recv_msg->u.request.u.death.pid
&& workerVerifier[i] == recv_msg->u.request.u.death.verifier)
{
workerDeathNotice[i] = true;
++deathNoticeCount;
found = true;
break;
}
}
if (!found)
{
printf("[%s] Could not find worker for (%d, %d:%d)\n",
MyName,
recv_msg->u.request.u.death.nid,
recv_msg->u.request.u.death.pid,
recv_msg->u.request.u.death.verifier);
}
}
else
{
printf("[%s] unexpected notice, type=%s\n", MyName,
MessageTypeString( recv_msg->type));
testSuccess = false;
}
}
int main (int argc, char *argv[])
{
const int MAX_CYCLES = 60;
int i;
int j;
int rc;
bool workerUp[MAX_WORKERS];
int sendbuf[3];
char recvbuf[100];
MPI_Comm manager_comm[MAX_WORKERS];
MPI_Status status;
char workerPort[MAX_WORKERS][MPI_MAX_PORT_NAME];
bool workerStartProcess[] = {false, false, true, true};
const char *workerName[] = {"$SERV0","$SERV1", "$SERV2", "$SERV3"};
int workerMsgCount[MAX_WORKERS];
char *serverArgs[1] = {(char *) "-t"};
int workerChildren = 0;
char procName[25];
for (i = 0; i < MAX_WORKERS; i++)
{
workerVerifier[i] = -1;
}
util.processArgs (argc, argv);
tracing = util.getTrace();
MyName = util.getProcName();
util.InitLocalIO( );
assert (gp_local_mon_io);
// Set local io callback function for "notices"
gp_local_mon_io->set_cb(recv_notice_msg, "notice");
util.requestStartup ();
sleep (2);
for (i = 0; i < MAX_WORKERS; i++)
{
manager_comm[i] = MPI_COMM_NULL;
workerUp[i] = false;
workerMsgCount[i] = 0;
if ( workerStartProcess[i] )
{
if ( tracing ) printf ("[%s] util.requestNewProcess(workerName[%d]=%s)\n", MyName, i, workerName[i]);
fflush (stdout);
util.requestNewProcess( 1
, ProcessType_Generic
, false
, workerName[i]
, "server"
, ""
, ""
, ((tracing) ? 1: 0)
, serverArgs
, workerNid[i]
, workerPid[i]
, workerVerifier[i]
, procName);
++workerChildren;
}
if ( tracing ) printf ("[%s] util.requestOpen(workerName[%d]=%s)\n", MyName, i, workerName[i]);
fflush (stdout);
if (util.requestOpen( workerName[i]
, workerVerifier[i]
, 0
, workerPort[i] ))
{
if ( tracing ) printf ("[%s] connectServer(workerName[%d]=%s)\n", MyName, i, workerName[i]);
fflush (stdout);
connectServer (workerPort[i], &manager_comm[i]);
}
if (manager_comm[i] != MPI_COMM_NULL)
{
workerUp[i] = true;
if ( tracing ) printf ("[%s] worker %d connected.\n", MyName, i);
}
else
{
workerUp[i] = false;
testSuccess = false;
printf ("[%s] worker %d failed to connect.\n", MyName, i);
}
fflush (stdout);
}
// do the work
if ( tracing )
{
printf ("[%s] Starting work with %d workers\n", MyName, MAX_WORKERS);
}
for (j = 0; j < MAX_CYCLES + 1; j++)
{
for (i = 0; i < MAX_WORKERS; i++)
{
if (workerUp[i])
{
sendbuf[0] = MyRank;
sendbuf[1] = i;
switch (j)
{
case 1:
sendbuf[2] = CMD_CONT;
break;
case MAX_CYCLES:
sendbuf[2] = CMD_END;
break;
default:
sendbuf[2] = CMD_CONT;
}
rc = XMPI_Sendrecv (sendbuf, 3, MPI_INT, 0, USER_TAG,
recvbuf, 100, MPI_CHAR, MPI_ANY_SOURCE,
USER_TAG, manager_comm[i], &status);
if (rc != MPI_SUCCESS)
{
printf
("[%s] SendRecv failed to worker %d, rc = (%d)%s\n",
MyName, i, rc, util.MPIErrMsg(rc));
util.closeProcess( manager_comm[i] );
sleep (5);
if (util.requestOpen( workerName[i]
, workerVerifier[i]
, 0
, workerPort[i] ))
{
connectServer (workerPort[i], &manager_comm[i]);
}
testSuccess = false;
}
else
{
++workerMsgCount[i];
}
}
else
{
printf ("[%s] Not sending to worker %d\n", MyName, i);
}
}
}
// close the server processes
for (i = 0; i < MAX_WORKERS; i++)
{
if (workerUp[i])
{
util.closeProcess ( manager_comm[i] );
}
if ( tracing )
{
printf("[%s] %d messages sent/received for worker %d\n", MyName,
workerMsgCount[i], i);
}
if ( workerMsgCount[i] != ( MAX_CYCLES + 1 ))
{
printf ("[%s] for worker %d, expecting %d messages but got %d\n",
MyName, i, ( MAX_CYCLES + 1 ), workerMsgCount[i]);
testSuccess = false;
}
}
// Wait until all death notices received or time-out
for (int i=0; i<5; i++)
{
if (deathNoticeCount == workerChildren) break;
sleep(1);
}
// Verify that got all death notices
for (int i=0; i<MAX_WORKERS; i++)
{
if (workerStartProcess[i] && !workerDeathNotice[i])
{
printf("[%s] No death notice received for worker %d, "
" %s (%d, %d)\n",
MyName, i, workerName[i], workerNid[i], workerPid[i]);
testSuccess = false;
}
}
printf("Multi-Node Test:\t\t%s\n", (testSuccess) ? "PASSED" : "FAILED");
// tell monitor we are exiting
util.requestExit ( );
XMPI_Close_port (util.getPort());
if ( gp_local_mon_io )
{
delete gp_local_mon_io;
}
exit (0);
}