blob: 190be7ed991b3ea413ab111a029d53f588a97d4d [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
*****************************************************************************
*
* File: ExpLOBprocess.cpp
* Description: class to store and retrieve LOB info from mxlobsrvr process.
*
*
* Created: 10/29/2012
* Language: C++
*
*
*
*
*****************************************************************************
*/
/*** Note *** This file is currently compiled and creates the mxlobsrvr executable. But the functions in this file are not active or used at this point. Code
maybe added in the near future to offload any tasks like garbage collection, to this process .Hence we are retainign this file as part of the mxlobsrvr
infrastructure .If any functionas are added and need to be executed in the
mxlobsrvr process, the sqstart/sqstop need to modified to call lobstop and
lostart**/
/****************************************************************************/
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <malloc.h>
#include <string>
#include <errno.h>
#include <sys/file.h>
#include <iostream>
#include <errno.h>
#include <fcntl.h> // for nonblocking
#include <netdb.h>
#include <pthread.h>
#include <signal.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include <zlib.h> // ZLIB compression library
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <sys/socket.h> // basic socket definitions
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h> // basic system data types
#include <sys/uio.h>
#include <sys/wait.h>
#include <guardian/kphandlz.h>
#include <seabed/ms.h>
#include <seabed/fs.h>
#include <seabed/pctl.h>
#include <seabed/pevents.h>
#include <seabed/fserr.h>
#include "ComRtUtils.h"
//#include "ExeReplInterface.h"
#include "Globals.h"
#include "NAExit.h"
#include "ex_ex.h" // ex_assert
#include "SCMVersHelp.h"
#define SQ_USE_LOB_PROCESS 1
#include "ExpLOBaccess.h"
#include "QRLogger.h"
#include "ExpLOBexternal.h"
extern int ms_transid_reg(MS_Mon_Transid_Type, MS_Mon_Transseq_Type);
extern void ms_transid_clear(MS_Mon_Transid_Type, MS_Mon_Transseq_Type);
extern "C" short GETTRANSID(short *transid);
extern "C" short JOINTRANSACTION(Int64 transid);
extern "C" short SUSPENDTRANSACTION(short *transid);
#define TRANSID_IS_VALID(idin) (idin.id[0] != 0)
using namespace std;
// Session State values
enum {
IDLE_STATE = 0,
WAITING = 1,
DATA_PENDING = 2,
WRITE_PENDING = 3,
SEND_PENDING = 4,
SENDING_DATA = 5,
RQST_PENDING = 6,
REPLY_PENDING = 7,
WAITING_REPLY = 8,
END_SESSION = 9,
COMM_RESET = 10,
READ_POSTED = 11,
// TCP/IP State Machine states
LISTENER_INIT_STATE = 20,
LISTENER_SOCKOPT_STATE = 21,
LISTENER_ACCEPT_STATE = 22,
LISTENER_SHUTDOWN_STATE = 23,
LISTENER_CLOSE_STATE = 24,
SESSION_INIT_STATE = 30,
SESSION_CONNECT_STATE = 31,
SESSION_CONNECT_CHECK_STATE = 32,
SESSION_SOCKOPT1_STATE = 33,
SESSION_SOCKOPT2_STATE = 34,
SESSION_RECV_STATE = 35,
SESSION_SEND_STATE = 36,
SESSION_SHUTDOWN_STATE = 37,
SESSION_CLOSE_STATE = 38
};
// AWAITIO classes
enum {Class_0 = 0,
Class_1 = 1,
Class_2 = 2,
Class_3 = 3,
Class_4 = 4,
Class_5 = 5,
Class_6 = 6,
Class_7 = 7};
void process_msg(BMS_SRE *sre)
{
// do work here
return;
}
class rcv_struct
{
public:
union {
unsigned char *databuf;
Lng32 *datalen;
};
Lng32 buflen;
short file;
short state;
};
#pragma fieldalign platform awaitio_tag_struct
class awaitio_tag_struct
{
public:
union {
SB_Tag_Type Tag;
struct {
ULng32 Class:4; // I/O class
ULng32 State:12; // State of this I/O operation.
ULng32 Index:16; // Index into control block table.
};
};
};
class awaitio_struct
{
public:
Lng32 Iocount;
short Error;
short File;
};
enum {IDLE_TIMEOUT = 30000, // 5 minute wait before stopping cli process.
MAX_RETRIES = 3, // Maximum number of retries before waiting.
NUM_OPENS = 64, // Initial number of entries in the OCB table.
RECV_BUFSIZE = 56 * 1024, // Size of $RECEIVE I/O buffer.
RETRY_TIMEOUT = 3000, // 30 second wait before retrying.
SOCK_BUFSIZE = 56 * 1024, // Size of socket I/O buffers.
STARTUP_TIMEOUT = 500}; // CLI Invokers will die if an open message is not
// received before a 5 sec startup timer expires.
rcv_struct rcv;
// Mutexes and thread variables
pthread_mutex_t cnfg_mutex;
pthread_mutex_t g_mutex;
pthread_attr_t thr_attr;
Lng32 total_dynamic_memory;
CliGlobals *cliGlobals = NULL;
char *myProgramName = NULL;
xzsys_ddl_smsg_def *sysmsg = NULL;
FS_Receiveinfo_Type rcvinfo;
ExLobGlobals *lobGlobals = NULL;
//*****************************************************************************
//*****************************************************************************
static void sigterm_handler(Lng32 signo)
{
printf("sigterm received\n");
}
void LOB_process_stop(short flag)
{
if (rcv.state == READ_POSTED)
BCANCEL(rcv.file);
else
NAExit(flag);
} // BDR_process_stop
//*****************************************************************************
static void *Calloc(size_t memsize)
{
void *ptr;
Lng32 retval;
retval = pthread_mutex_lock(&g_mutex);
ex_assert(retval == 0, "Calloc 1");
ptr = calloc(1, memsize);
ex_assert(ptr != NULL, "Calloc 2");
total_dynamic_memory += memsize;
retval = pthread_mutex_unlock(&g_mutex);
ex_assert(retval == 0, "Calloc 3");
return(ptr);
} // Calloc
void post_receive_read(void)
{
_bcc_status cc_status;
awaitio_tag_struct tag;
if (rcv.state == END_SESSION)
LOB_process_stop(0);
if (rcv.databuf == NULL)
{
rcv.buflen = RECV_BUFSIZE + 2*sizeof(int);
rcv.databuf = (unsigned char *)Calloc(rcv.buflen);
sysmsg = (xzsys_ddl_smsg_def *)&rcv.datalen[0];
}
tag.Tag = 0;
tag.Class = Class_7;
cc_status = BREADUPDATEX(rcv.file, (char *)&rcv.datalen[0], RECV_BUFSIZE,
NULL, tag.Tag);
ex_assert(_status_eq(cc_status), "post_receive_read 1");
rcv.state = READ_POSTED;
return;
} // post_receive_read
short process_open(void)
{
Lng32 retval;
short retvals = XZFIL_ERR_OK;
return retvals;
} // process_open
//*****************************************************************************
//*****************************************************************************
short process_close(void)
{
Lng32 retval;
short retvals = XZFIL_ERR_OK;
return retvals;
} // process_close
void process_mon_msg(MS_Mon_Msg *msg) {
printf("server received monitor msg, type=%d\n", msg->type);
switch (msg->type) {
case MS_MsgType_Change:
printf(" type=%d, group=%s, key=%s, value=%s\n",
msg->u.change.type,
msg->u.change.group,
msg->u.change.key,
msg->u.change.value);
break;
case MS_MsgType_Close:
printf(" nid=%d, pid=%d, process=%s, aborted=%d\n",
msg->u.close.nid,
msg->u.close.pid,
msg->u.close.process_name,
msg->u.close.aborted);
break;
case MS_MsgType_Event:
break;
case MS_MsgType_NodeDown:
printf(" nid=%d, node=%s\n",
msg->u.down.nid,
msg->u.down.node_name);
break;
case MS_MsgType_NodeUp:
printf(" nid=%d, node=%s\n",
msg->u.up.nid,
msg->u.up.node_name);
break;
case MS_MsgType_Open:
printf(" nid=%d, pid=%d, process=%s, death=%d\n",
msg->u.open.nid,
msg->u.open.pid,
msg->u.open.process_name,
msg->u.open.death_notification);
break;
case MS_MsgType_ProcessCreated:
printf(" nid=%d, pid=%d, tag=0x%llx, process=%s, ferr=%d\n",
msg->u.process_created.nid,
msg->u.process_created.pid,
msg->u.process_created.tag,
msg->u.process_created.process_name,
msg->u.process_created.ferr);
break;
case MS_MsgType_ProcessDeath:
printf(" nid=%d, pid=%d, aborted=%d, process=%s\n",
msg->u.death.nid,
msg->u.death.pid,
msg->u.death.aborted,
msg->u.death.process_name);
break;
case MS_MsgType_Service:
break;
case MS_MsgType_Shutdown:
printf(" nid=%d, pid=%d, level=%d\n",
msg->u.shutdown.nid,
msg->u.shutdown.pid,
msg->u.shutdown.level);
break;
case MS_MsgType_TmSyncAbort:
case MS_MsgType_TmSyncCommit:
break;
case MS_MsgType_UnsolicitedMessage:
break;
default:
break;
}
}
Ex_Lob_Error ExLob::getDesc(ExLobRequest *request)
{
Ex_Lob_Error err;
Lng32 clierr;
Int64 dummyParam;
Lng32 handleOutLen = 0;
Lng32 blackBoxLen = 0;
Int64 offset;
Int64 size;
clierr = SQL_EXEC_LOBcliInterface(request->getHandleIn(), request->getHandleInLen(),
request->getBlackBox(), &blackBoxLen,
request->getHandleOut(), &handleOutLen,
LOB_CLI_SELECT_UNIQUE, LOB_CLI_ExecImmed,
&offset, &size,
&dummyParam, &dummyParam,
0,
request->getTransId(), FALSE);
request->setHandleOutLen(handleOutLen);
request->setBlackBoxLen(blackBoxLen);
request->setCliError(clierr);
return LOB_OPER_OK;
}
void processRequest(ExLobRequest *request)
{
Ex_Lob_Error err = LOB_OPER_OK;
Int64 descNum;
Int64 dataOffset;
Int64 operLen;
ExLobDescHeader descHeader;
ExLob *lobPtr;
ExLobDesc desc;
ExLobDesc *descPtr;
Lng32 clierr;
Lng32 handleOutLen;
Int64 size;
err = lobGlobals->getLobPtr(request->getDescFileName(), lobPtr);
if (err != LOB_OPER_OK)
{
request->setError(LOB_INIT_ERROR);
return ;
}
if (!lobGlobals->isCliInitialized())
{
Lng32 clierr = SQL_EXEC_LOBcliInterface(0, 0,
0, 0,
0, 0,
LOB_CLI_INIT, LOB_CLI_ExecImmed,
0, 0,
0, 0,
0,
0,FALSE);
if (clierr < 0)
{
request->setError(LOB_INIT_ERROR);
return ;
}
lobGlobals->setCliInitialized();
}
switch(request->getType())
{
case Lob_Req_Get_Desc:
err = lobPtr->getDesc(request);
break;
default:
err = LOB_REQUEST_UNDEFINED_ERROR;
printf("bad request = %d\n", request->getType());
break;
};
request->setError(err);
return;
}
void receive_message(ExLobRequest *request)
{
Int64 transId;
int err;
int cliRC = GETTRANSID((short *)&transId);
printf("transid before setting = %ld\n", transId);
if (TRANSID_IS_VALID(request->getTransIdBig())) {
err = ms_transid_reg(request->getTransIdBig(), request->getTransStartId());
printf("transid reg err = %d\n", err);
} else if (request->getTransId()) {
err = JOINTRANSACTION(request->getTransId());
printf("join txn err = %d\n", err);
}
cliRC = GETTRANSID((short *)&transId);
printf("transid after setting = %ld\n", transId);
processRequest(request);
if (TRANSID_IS_VALID(request->getTransIdBig())) {
ms_transid_clear(request->getTransIdBig(), request->getTransStartId());
} else if (request->getTransId()) {
transId = request->getTransId();
SUSPENDTRANSACTION((short*)&transId);
}
return;
} // receive_message
Ex_Lob_Error ExLobGlobals::initialize()
{
lobMap_ = (lobMap_t *) new lobMap_t;
if (lobMap_ == NULL)
return LOB_INIT_ERROR;
return LOB_OPER_OK;
}
Ex_Lob_Error ExLobGlobals::getLobPtr(char *lobName, ExLob *& lobPtr)
{
Ex_Lob_Error err;
lobMap_t *lobMap = NULL;
lobMap_it it;
lobMap = lobGlobals->getLobMap();
it = lobMap->find(string(lobName));
if (it == lobMap->end())
{
lobPtr = new (lobGlobals->getHeap())ExLob(lobGlobals->getHeap(),NULL);
if (lobPtr == NULL)
return LOB_ALLOC_ERROR;
lobMap->insert(pair<string, ExLob*>(string(lobName), lobPtr));
}
else
{
lobPtr = it->second;
}
return LOB_OPER_OK;
}
Ex_Lob_Error ExLobGlobals::delLobPtr(char *lobName)
{
Ex_Lob_Error err;
lobMap_t *lobMap = NULL;
lobMap_it it;
lobMap = lobGlobals->getLobMap();
it = lobMap->find(string(lobName));
if (it != lobMap->end()) {
ExLob *lobPtr = it->second;
delete lobPtr;
lobMap->erase(it);
}
return LOB_OPER_OK;
}
Lng32 main(Lng32 argc, char *argv[])
{
Lng32 lv_event_len;
Lng32 lv_error;
Lng32 lv_ret;
BMS_SRE lv_sre;
Lng32 retval;
_bcc_status cc_status;
awaitio_tag_struct awaitTag;
awaitio_struct awaitIo;
// Register sigterm_handler as our signal handler for SIGTERM.
if (signal(SIGTERM, sigterm_handler) == SIG_ERR)
{
cout << "*** Cannot handle SIGTERM ***" << endl;
exit(1);
}
retval = pthread_mutex_init(&cnfg_mutex, NULL);
ex_assert(retval == 0, "main 1");
// seaquest related stuff
retval = msg_init_attach(&argc, &argv, true, (char *)"");
if (retval)
printf("msg_init_attach returned: %d\n", retval);
// sq_fs_dllmain();
msg_mon_process_startup(true);
msg_mon_enable_mon_messages(1);
retval = atexit((void(*)(void))msg_mon_process_shutdown);
if (retval != 0)
{
cout << "*** atexit failed with error " << retval << " ***" << endl;
exit(1);
}
// setup log4cxx
QRLogger::initLog4cxx(QRLogger::QRL_LOB);
// initialize lob globals
lobGlobals = new ExLobGlobals(NULL);
if (lobGlobals == NULL)
return -1;
retval = lobGlobals->initialize();
if (retval != LOB_OPER_OK)
return -1;
/* Lng32 clierr = SQL_EXEC_LOBcliInterface(0, 0,
0, 0,
0, 0,
LOB_CLI_INIT, LOB_CLI_ExecImmed,
0, 0,
0, 0,
0,
0);
if (clierr < 0)
return -1; */
bool done = false;
BMS_SRE sre;
Lng32 len;
char recv_buffer[BUFSIZ];
Lng32 err;
printf("lob process initialialized. Ready for requests\n");
while(!done)
{
do {
XWAIT(LREQ, -1);
err = BMSG_LISTEN_((short *) &sre, 0, 0);
}
while (err == BSRETYPE_NOWORK);
err = BMSG_READDATA_(sre.sre_msgId, recv_buffer, BUFSIZ);
if ((sre.sre_flags & XSRE_MON)) {
MS_Mon_Msg *msg = (MS_Mon_Msg *)recv_buffer;
process_mon_msg(msg);
if (msg->type == MS_MsgType_Shutdown)
done = true;
len = 0;
} else {
receive_message((ExLobRequest *)recv_buffer);
len = sizeof(ExLobRequest);
}
BMSG_REPLY_(sre.sre_msgId,
NULL,
0,
recv_buffer,
len,
0,
NULL);
}
msg_mon_process_shutdown();
return 0;
}