blob: 9950d1629ddf513e80e34eb8165b6500dcdf4d72 [file] [log] [blame]
///////////////////////////////////////////////////////////////////////////////
//
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
//
///////////////////////////////////////////////////////////////////////////////
// deathWatch: does process startup, get commands from controller process
// to register interest in death of processes, waits for death notices,
// reports status, and exits upon command.
#include <stdio.h>
#include <stdlib.h>
#include "clio.h"
#include "sqevlog/evl_sqlog_writer.h"
#include "montestutil.h"
#include "xmpi.h"
#include "deathNotice.h"
MonTestUtil util;
long trace_settings = 0;
FILE *shell_locio_trace_file = NULL;
bool tracing = false;
const char *MyName;
int gv_ms_su_nid = -1; // Local IO nid to make compatible w/ Seabed
SB_Verif_Type gv_ms_su_verif = -1;
char ga_ms_su_c_port[MPI_MAX_PORT_NAME] = {0};
char victimName[25];
bool gotDeathNotice = false;
// Routine for handling notices:
// NodeDown, NodeUp, ProcessDeath, Shutdown, TmSyncAbort, TmSyncCommit
void recv_notice_msg(struct message_def *recv_msg, int )
{
if ( recv_msg->type == MsgType_ProcessDeath )
{
if ( tracing )
printf("[%s] Process death notice received for %s (%d, %d:%d),"
" trans_id=%lld.%lld.%lld.%lld., aborted=%d\n",
MyName,
recv_msg->u.request.u.death.process_name,
recv_msg->u.request.u.death.nid,
recv_msg->u.request.u.death.pid,
recv_msg->u.request.u.death.verifier,
recv_msg->u.request.u.death.trans_id.txid[0],
recv_msg->u.request.u.death.trans_id.txid[1],
recv_msg->u.request.u.death.trans_id.txid[2],
recv_msg->u.request.u.death.trans_id.txid[3],
recv_msg->u.request.u.death.aborted);
if (strcmp(recv_msg->u.request.u.death.process_name, victimName) == 0)
{ // Got expected death notice
gotDeathNotice = true;
}
}
else
{
printf("[%s] unexpected notice, type=%s\n", MyName,
MessageTypeString( recv_msg->type));
}
}
void processCommands()
{
MPI_Comm worker_comm;
int servNid;
int servPid;
Verifier_t servVerifier;
int rc;
MPI_Status status;
int recvbuf[6];
char sendbuf[100];
bool done = false;
const int serverTag = 100;
if ( tracing )
{
printf ("[%s] Port: %s\n", MyName, util.getPort());
}
if ( tracing )
{
printf ("[%s] Wait to connect.\n", MyName);
}
XMPI_Comm_accept (util.getPort(), MPI_INFO_NULL, 0, MPI_COMM_SELF,
&worker_comm);
XMPI_Comm_set_errhandler (worker_comm, MPI_ERRORS_RETURN);
if ( tracing )
{
printf ("[%s] Connected.\n", MyName);
}
do
{
rc = XMPI_Recv (recvbuf, 6, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG,
worker_comm, &status);
if (rc == MPI_SUCCESS)
{
switch (recvbuf[0])
{
case CMD_REG_INTEREST:
strcpy(victimName, (char *)&recvbuf[1]);
if ( tracing )
{
printf("[%s] got command CMD_REG_INTEREST for %s.\n",MyName,
victimName);
}
if (util.requestProcInfo( victimName
, servNid
, servPid
, servVerifier))
{
_TM_Txid_External transid = {{0LL, 0LL, 0LL, 0LL}};
util.requestNotice( servNid
, servPid
, servVerifier
, ""
, false
, transid);
}
strcpy(sendbuf, "OK");
break;
case CMD_GET_STATUS:
if ( tracing )
{
printf("[%s] got command CMD_GET_STATUS.\n",MyName);
}
if (gotDeathNotice)
strcpy(sendbuf, "OK");
else
strcpy(sendbuf, "FAILED");
break;
case CMD_END:
if ( tracing )
{
printf("[%s] got command CMD_END.\n",MyName);
}
strcpy(sendbuf, "OK");
done = true;
break;
default:
sprintf (sendbuf, "[%s] Received (%d:%d) UNKNOWN", MyName,
recvbuf[0], recvbuf[1]);
}
rc = XMPI_Send (sendbuf, (int) strlen (sendbuf) + 1, MPI_CHAR, 0,
serverTag, worker_comm);
}
else
{ // Receive failed
printf("[%s] XMPI_Recv failed, rc = (%d) %s\n",
MyName, rc, util.MPIErrMsg(rc));
done = true;
}
}
while (!done);
if ( tracing )
{
printf ("[%s] disconnecting.\n", MyName);
}
util.closeProcess ( worker_comm );
}
int main (int argc, char *argv[])
{
victimName[0] = '\0';
util.processArgs (argc, argv);
tracing = util.getTrace();
MyName = util.getProcName();
util.InitLocalIO( );
assert (gp_local_mon_io);
// Set local io callback function for "notices"
gp_local_mon_io->set_cb(recv_notice_msg, "notice");
util.requestStartup ();
// Get and execute commands from controller process
processCommands();
// tell monitor we are exiting
util.requestExit ( );
fflush (stdout);
XMPI_Close_port( util.getPort() );
if ( gp_local_mon_io )
{
delete gp_local_mon_io;
}
exit (0);
}