blob: 8d3873e4c3910faa4e24c6f04938a59249fa690a [file] [log] [blame]
//------------------------------------------------------------------
//
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "seabed/fs.h"
#include "seabed/fserr.h"
#include "seabed/ms.h"
#include "seabed/pctl.h"
#include "seabed/pevents.h"
#include "seabed/thread.h"
#include "ml.h"
#include "queue.h"
#include "tchkfe.h"
#include "tms.h"
#include "tmsfsutil.h"
#include "tutil.h"
#include "tutilp.h"
typedef struct Link {
SB_ML_Type iv_link;
char *ip_buf;
int iv_msgid;
} Link;
bool im_backup = false;
int my_argc;
char **my_argp;
char my_name[BUFSIZ];
int my_nid = -1;
int my_pid = -1;
int peer_nid = -1;
int peer_oid;
TPT_DECL (peer_phandle);
int peer_pid = -1;
char prog[MS_MON_MAX_PROCESS_PATH];
char recv_buffer[40000];
SB_Thread::Thread *thr_recv;
char send_buffer[40000];
bool shutdown_rcvd = false;
SB_Sig_Queue sigq("sigq", false);
bool takeoverv = false;
int trip = 0;
bool verbose = false;
// forwards
static int checkpoint(MS_SRE *sre, void *buf, int *size);
static void process_request(int msgid, char *recvbuf);
static int recv(MS_SRE *sre, void *buf, int *size);
static void start_backup(int nid);
# ifdef NDEBUG
# ifdef USE_ASSERT_ABORT
# define myassert(exp) (void)((exp)||abort())
# else
# define myassert(exp) ((void)0)
# endif
# else
# define myassert(exp) (void)((exp)||(myassertfun(#exp, __FILE__, __LINE__), 0))
# endif // NDEBUG
void myassertfun(const char *exp,
const char *file,
unsigned line) {
fprintf(stderr, "TEST Assertion failed (%d): %s, file %s, line %u\n",
getpid(),
exp, file, line);
fflush(stderr);
abort();
}
void *thread_recv(void *arg) {
char *buf;
Link *l;
int len;
int msgid;
MS_SRE sre;
arg = arg; // touch
for (;;) {
l = (Link *) sigq.remove();
buf = l->ip_buf;
msgid = l->iv_msgid;
delete l;
if (buf == NULL)
break;
process_request(msgid, buf);
len = 0;
checkpoint(&sre, buf, &len);
}
return NULL;
}
void *thread_recv_fun(void *arg) {
return thread_recv(arg);
}
int checkpoint(MS_SRE *sre, void *buf, int *size) {
int disable;
int ferr;
int lerr;
if (im_backup) {
if (verbose)
printf("srv-b: Waiting for checkpoint.\n");
if (recv(sre, buf, size))
ferr = XZFIL_ERR_PATHDOWN;
else
ferr = XZFIL_ERR_OK;
} else {
int msgid;
MS_Result_Type results;
if (verbose)
printf("srv-p: Sending checkpoint.\n");
disable = msg_test_assert_disable();
ferr = XMSG_LINK_(TPT_REF(peer_phandle), // phandle
&msgid, // msgid
NULL, // reqctrl
0, // reqctrlsize
NULL, // replyctrl
0, // replyctrlmax
(char *) buf, // reqdata
(ushort) *size, // reqdatasize
NULL, // replydata
0, // replydatamax
0, // linkertag
0, // pri
0, // xmitclass
XMSG_LINK_LDONEQ); // linkopts
msg_test_assert_enable(disable);
// ignore checkpoint error
TEST_CHK_FEIGNORE(ferr);
lerr = XWAIT(LDONE, -1);
TEST_CHK_WAITIGNORE(lerr);
lerr = XMSG_LISTEN_((short *) sre, // sre
XLISTEN_ALLOW_LDONEM, // listenopts
0); // listenertag
TEST_CHK_WAITIGNORE(lerr);
disable = msg_test_assert_disable();
ferr = XMSG_BREAK_(sre->sre_msgId,
(short *) &results,
TPT_REF(peer_phandle));
msg_test_assert_enable(disable);
TEST_CHK_FEIGNORE(ferr);
if (ferr != XZFIL_ERR_OK)
start_backup(-1);
}
return ferr;
}
bool is_backup() {
int ferr;
MS_Mon_Process_Info_Type proc_info;
ferr = msg_mon_get_process_info_detail(my_name, &proc_info);
TEST_CHK_FEOK(ferr);
my_nid = proc_info.nid;
my_pid = proc_info.pid;
bool backup = proc_info.backup ? true : false;
if (backup) {
ferr = msg_mon_register_death_notification(proc_info.parent_nid,
proc_info.parent_pid);
assert(ferr == XZFIL_ERR_OK);
}
return backup;
}
void process_request(int msgid, char *recvbuf) {
static int trip_cnt = 0;
recvbuf = recvbuf; // touch
if (im_backup) {
if (verbose)
printf("srv-b: processing checkpointed data\n");
if (trip) {
trip_cnt++;
if (trip_cnt >= trip)
util_abort_core_free();
}
} else {
if (verbose)
printf("srv-p: processed client request\n");
}
if (verbose)
printf("srv-%c: sending reply\n", im_backup ? 'b' : 'p');
XMSG_REPLY_(msgid, // msgid
NULL, // replyctrl
0, // replyctrlsize
NULL, // replydata
0, // replydatasize
0, // errorclass
NULL); // newphandle
}
void queue_process_request(char *recvbuf, int msgid) {
Link *l = new Link;
l->ip_buf = recvbuf;
l->iv_msgid = msgid;
sigq.add(&l->iv_link);
}
void queue_stop() {
Link *l = new Link;
l->ip_buf = NULL;
sigq.add(&l->iv_link);
}
int recv(MS_SRE *sre, void *buf, int *size) {
int *lbuf;
int ferr;
int lerr;
bool mon_msg;
int nid;
int pid;
do {
do {
lerr = XWAIT(LREQ, -1);
TEST_CHK_WAITIGNORE(lerr);
lerr = XMSG_LISTEN_((short *) sre, // sre
0, // listenopts
0); // listenertag
} while (lerr == XSRETYPE_NOWORK);
mon_msg = (sre->sre_flags & XSRE_MON);
if (mon_msg) {
int msg_size = sre->sre_reqDataSize;
char *msg = new char[msg_size];
ferr = XMSG_READDATA_(sre->sre_msgId, // msgid
msg, // reqdata
(ushort) msg_size); // bytecount
assert(ferr == XZFIL_ERR_OK);
MS_Mon_Msg *mon_msgp = (MS_Mon_Msg *) msg;
if (mon_msgp->type == MS_MsgType_Shutdown) {
shutdown_rcvd = true;
if (verbose)
printf("srv-%c: shutdown rcvd\n", im_backup ? 'b' : 'p');
}
XMSG_REPLY_(sre->sre_msgId, // msgid
NULL, // replyctrl
0, // replyctrlsize
NULL, // replydata
0, // replydatasize
0, // errorclass
NULL); // newphandle
delete [] msg;
if (takeoverv)
return 1;
if (shutdown_rcvd)
return 2;
} else {
ferr = XMSG_READDATA_(sre->sre_msgId, // msgid
(char *) buf, // reqctrl
(ushort) *size); // bytecount
assert(ferr == XZFIL_ERR_OK);
*size = sre->sre_reqDataSize;
ferr = XMSG_GETREQINFO_(MSGINFO_NID,
sre->sre_msgId,
&nid);
assert(ferr == XZFIL_ERR_OK);
ferr = XMSG_GETREQINFO_(MSGINFO_PID,
sre->sre_msgId,
&pid);
assert(ferr == XZFIL_ERR_OK);
lbuf = (int *) buf;
if (verbose)
printf("srv-%c: received from p-id=%d/%d: %d.%d.%d\n",
im_backup ? 'b' : 'p',
nid, pid, lbuf[0], lbuf[1], lbuf[2]);
}
} while (mon_msg);
return 0;
}
void start_backup(int nid) {
int ferr;
nid = nid; // no-warn
ferr = msg_mon_get_process_info(NULL, &nid, &peer_pid);
TEST_CHK_FEOK(ferr);
peer_nid = 1 - nid;
peer_pid = -1;
if (verbose)
printf("srv-p: starting backup process with open.\n");
ferr = msg_mon_start_process(prog, // prog
my_name, // name
NULL, // ret name
my_argc,
my_argp,
TPT_REF(peer_phandle),
0, // open
&peer_oid,
MS_ProcessType_Generic, // type
0, // priority
0, // debug
1, // backup
&peer_nid, // nid
&peer_pid, // pid
NULL, // infile
NULL); // outfile
TEST_CHK_FEOK(ferr);
ferr = msg_mon_open_process_backup(my_name,
TPT_REF(peer_phandle),
&peer_oid);
TEST_CHK_FEOK(ferr);
if (verbose)
printf("srv-p: after start_backup - peer p-id=%d/%d\n",
peer_nid, peer_pid);
}
int main(int argc, char *argv[]) {
bool client = false;
int err;
int ferr;
int inx;
int len;
int loop = 10;
int msgid;
int oid;
TPT_DECL (phandle);
void *result;
RT results;
MS_SRE sre;
TAD zargs[] = {
{ "-client", TA_Bool, TA_NOMAX, &client },
{ "-loop", TA_Int, TA_NOMAX, &loop },
{ "-server", TA_Ign, TA_NOMAX, NULL },
{ "-trip", TA_Int, TA_NOMAX, &trip },
{ "-v", TA_Bool, TA_NOMAX, &verbose },
{ "", TA_End, TA_NOMAX, NULL }
};
msfs_util_init(&argc, &argv, msg_debug_hook);
arg_proc_args(zargs, false, argc, argv);
util_test_start(client);
ferr = msg_mon_process_startup(!client); // system messages?
TEST_CHK_FEOK(ferr);
my_argc = argc; // after msg_init - it removes args
my_argp = argv;
if (client) {
ferr = msg_mon_open_process((char *) "$srv", // name
TPT_REF(phandle),
&oid);
TEST_CHK_FEOK(ferr);
}
ferr = msg_mon_get_my_process_name(my_name, sizeof(my_name));
assert(ferr == XZFIL_ERR_OK);
for (inx = 0; inx < loop; inx++) {
if (client) {
sprintf(send_buffer, "hello, greetings from %s, inx=%d",
my_name, inx);
ferr = XMSG_LINK_(TPT_REF(phandle), // phandle
&msgid, // msgid
NULL, // reqctrl
0, // reqctrlsize
NULL, // replyctrl
0, // replyctrlmax
send_buffer, // reqdata
39000, // reqdatasize
recv_buffer, // replydata
40000, // replydatamax
0, // linkertag
0, // pri
0, // xmitclass
0); // linkopts
util_check("XMSG_LINK_", ferr);
ferr = XMSG_BREAK_(msgid, results.u.s, TPT_REF(phandle));
util_check("XMSG_BREAK_", ferr);
assert(results.u.t.ctrl_size == 0);
} else {
sprintf(prog, "%s/%s", getenv("PWD"), argv[0]);
msg_mon_enable_mon_messages(true); // get mon messages
thr_recv = new SB_Thread::Thread(thread_recv_fun, "recv");
thr_recv->start();
if (is_backup()) {
im_backup = true;
while (!takeoverv) {
len = sizeof(recv_buffer);
ferr = checkpoint(&sre, recv_buffer, &len);
if (ferr == XZFIL_ERR_OK)
process_request(sre.sre_msgId, recv_buffer);
else if (shutdown_rcvd)
break;
}
}
if (!shutdown_rcvd) {
start_backup(-1);
for (inx = 0; inx < loop; inx++) {
len = sizeof(recv_buffer);
if (!recv(&sre, recv_buffer, &len)) {
queue_process_request(recv_buffer, sre.sre_msgId);
}
}
}
queue_stop();
err = thr_recv->join(&result);
assert(err == 0);
delete thr_recv;
}
}
if (client) {
ferr = msg_mon_close_process(TPT_REF(phandle));
TEST_CHK_FEOK(ferr);
}
ferr = msg_mon_process_shutdown();
TEST_CHK_FEOK(ferr);
util_test_finish(client);
return 0;
}