blob: 9f0b18eba3786c1a3d45de9cdf14d9edb302355e [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
*****************************************************************************
*
* File: udrserv.cpp
* Description: This is the main program for UDR Server process. The tasks for
* this process are to :
* . Handle messages to and from the Master Executor
* . process SP requests from executor
* . process result set requests from the executor/CLI
*
* Created: 01/01/2001
* Language: C++
*
*****************************************************************************
*/
#include "Platform.h"
#include <fstream>
#include <iostream>
#include "NAMemory.h"
#include "ComTransInfo.h"
#include "udrserv.h"
#include "UdrStreams.h"
#include "ComDiags.h"
#include "udrdecs.h"
#include "ErrorMessage.h"
#include "UdrFFDC.h"
#include "UdrDebug.h"
#include "UdrAbortCallBack.h"
#include "LmJavaOptions.h"
#include "UdrCfgParser.h"
#include "ComRtUtils.h"
#include "LmLangManagerJava.h"
#include "LmLangManagerC.h"
#include "LmRoutine.h"
#include "ComDefs.h"
#include "sqludr.h"
#include "NAUserId.h"
#include "SCMVersHelp.h"
DEFINE_DOVERS(tdm_udrserv)
#include "dtm/tm.h"
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#define GETPID getpid
#include "Measure.h"
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#ifdef UDR_OSS_DEBUG
#include <signal.h>
#endif
#ifdef UDR_DEBUG
#include "HeapLog.h"
#endif
// Global pointer to the UdrGlobals instance
UdrGlobals *UDR_GLOBALS = NULL;
extern THREAD_P jmp_buf ExportJmpBuf;
extern THREAD_P jmp_buf* ExportJmpBufPtr;
void processASessionMessage(UdrGlobals *UdrGlob,
UdrServerReplyStream &msgStream,
UdrSessionMsg &request);
void processAnEnterTxMessage(UdrGlobals *UdrGlob,
UdrServerReplyStream &msgStream,
UdrEnterTxMsg &request);
void processASuspendTxMessage(UdrGlobals *UdrGlob,
UdrServerReplyStream &msgStream,
UdrSuspendTxMsg &request);
void processAnExitTxMessage(UdrGlobals *UdrGlob,
UdrServerReplyStream &msgStream,
UdrExitTxMsg &request);
void processALoadMessage(UdrGlobals *UdrGlob,
UdrServerReplyStream &msgStream,
UdrLoadMsg &request,
IpcEnvironment &env);
void processAnUnLoadMessage(UdrGlobals *UdrGlob,
UdrServerReplyStream &msgStream,
UdrUnloadMsg &request);
void processAnInvokeMessage(UdrGlobals *UdrGlob,
UdrServerDataStream &msgStream,
UdrDataBuffer &request);
void processAnRSLoadMessage(UdrGlobals *UdrGlob,
UdrServerReplyStream &msgStream,
UdrRSLoadMsg &request);
void processAnRSCloseMessage(UdrGlobals *UdrGlob,
UdrServerReplyStream &msgStream,
UdrRSCloseMsg &request);
void processAnRSUnloadMessage(UdrGlobals *UdrGlob,
UdrServerReplyStream &msgStream,
UdrRSUnloadMsg &request);
NABoolean processCmdLine(UdrGlobals *UdrGlob, Int32 argc, char **argv);
// static long getClientId(UdrGlobals *udrGlob);
void processARequest(UdrGlobals *UdrGlob,
UdrServerReplyStream &msgStream,
IpcEnvironment &env);
static void verifyUdrServer(UdrGlobals &glob);
static const char *LmResultToString(const LmResult &r);
static void DumpDiags(ostream &stream, ComDiagsArea *d, const char *prefix);
static void DumpProcessInfo();
static ComDiagsArea *addOrCreateErrorDiags(UdrGlobals *UdrGlob,
Lng32 errorNumber,
Lng32 intErrorInfo,
const char *charErrorInfo,
ComDiagsArea *diags);
//
// Function to invoke a Java method using the Language Manager
// (LM) API. This function supports command-line LM method invocations
// and is never called when MXUDR runs as a server process.
//
static const char *MXUDR_PREFIX = "[MXUDR]";
static const char *MXUDR_PREFIX_PLUS_SPACE = "[MXUDR] ";
static FILE *MXUDR_OUTFILE = stdout; // TODO: fix when stderr available
static Int32 invokeUdrMethod(const char *method,
const char *container,
const char *path,
NABoolean isJava,
NABoolean isUdf,
NABoolean txRequired,
NABoolean useVarchar,
Int32 argc,
char *argv[],
Int32 nResultSets,
Int32 nTimesToInvoke,
UdrGlobals &glob);
// Dead Code
// These methods are not used, and the interface has not been tested for a long time.
// We might want to retire them
static Int32 processCommandsFromFile(const char *filename, UdrGlobals &glob);
static Int32 processSingleCommandFromFile(FILE *f, UdrGlobals &glob);
static NAString initErrText("");
/*************************************************************************
Helper function to propagate all Java-related environment settings
found in an optional configuration file into an LmJavaOptions instance.
File must be in location indicated by envvar TRAFUDRCFG, or if not found,
use default of $TRAF_CONF/trafodion.udr.config
*************************************************************************/
void readCfgFileSection ( const char *section, LmJavaOptions &javaOptions )
{
char *p = NULL;
char buffer [BUFFMAX+1];
while ((UdrCfgParser::readSection(section, buffer, sizeof(buffer), initErrText )) > 0)
{
UDR_DEBUG2("[%s] entry: %s", section, buffer);
if (strcmp(section, "java") == 0)
{
javaOptions.addOption(buffer, TRUE);
}
else
if (strcmp(section, "env") == 0)
{
if ((p = UdrCfgParser::textPos( buffer, "CLASSPATH" )))
{
javaOptions.addSystemProperty("java.class.path", p);
}
else
if ((p = UdrCfgParser::textPos( buffer, "JREHOME" )))
{
javaOptions.addSystemProperty("sqlmx.udr.jrehome", p);
}
putenv(buffer);
}
else
{
break;
}
} //while
}
void InitializeJavaOptionsFromEnvironment(LmJavaOptions &javaOptions
,NAMemory *heap
)
{
char *val;
/* Look for java startup options and envvars in configuration file */
if (UdrCfgParser::cfgFileIsOpen(initErrText))
{
readCfgFileSection("env", javaOptions);
readCfgFileSection("java", javaOptions);
UdrCfgParser::closeCfgFile();
}
if ((val = getenv("TRAF_UDR_JAVA_OPTIONS")))
{
const char *delimiters = " \t";
bool newlineIsPresent = (strchr(val, '\n') ? true : false);
if (newlineIsPresent)
{
delimiters = "\n";
}
javaOptions.addOptions(val, delimiters, TRUE);
}
}
/*************************************************************************
// Helper function to change =_DEFAULTS define value.
// MXOSRVR gets the =_DAFAULTS value that contains the primary node name.
// This causes problem if the MXOSRVR is on secondary nodes. gethostbyname()
// tries to open TCPIP process on primary node instead of secondary node.
//
// This method changes the system name in the =_DEFAULTS define
// to current node name.
*************************************************************************/
//-----------------------------------------------------------------
static void runServer(Int32 argc, char **argv);
Int32 main(Int32 argc, char **argv)
{
dovers(argc, argv);
file_debug_hook("udrserv", "udrserv.hook");
try {
file_init_attach(&argc, &argv, TRUE, (char*) "");
}
catch (SB_Fatal_Excep &e) {
SQLMXLoggingArea::logExecRtInfo(__FILE__, __LINE__, e.what(), 0);
exit(1);
}
try {
file_mon_process_startup(true);
}
catch (SB_Fatal_Excep &e) {
SQLMXLoggingArea::logExecRtInfo(__FILE__, __LINE__, e.what(), 0);
exit(1);
}
// Commented out the call to my_mpi_fclose() below as
// the function UdrExitHandler(in file UdrFFDC.cpp) takes care of
// calling my_mpi_fclose() so we do not need to register that
// function here as an exit handler.
// atexit(my_mpi_fclose);
LOG_AGENT
UdrAbortCallBack udrACB;
registerAbortCallBack(&udrACB);
UDR_DEBUG_SIGNAL_HANDLERS("[MXUDR] The original signal handlers");
setUdrSignalHandlers();
saveUdrTrapSignalHandlers();
UDR_DEBUG_SIGNAL_HANDLERS("[MXUDR] Saved UDR trap signal handlers");
setExitHandler();
UDR_DEBUG0("[MXUDR] Registered Exit handler");
// setup log4cxx,
QRLogger::initLog4cxx(QRLogger::QRL_UDR);
// Synchronize C and C++ output streams
ios::sync_with_stdio();
// Redirect stdout and stderr to files named in environment
// variables
const char *stdOutFile = getenv("SQLMX_UDR_STDOUT");
const char *stdErrFile = getenv("SQLMX_UDR_STDERR");
Int32 fdOut = -1;
Int32 fdErr = -1;
if (stdOutFile && stdOutFile[0])
{
fdOut = open(stdOutFile,
O_WRONLY | O_APPEND | O_CREAT | O_SYNC,
S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
if (fdOut >= 0)
{
fprintf(stdout, "[Redirecting MXUDR stdout to %s]\n", stdOutFile);
fflush(stdout);
dup2(fdOut, fileno(stdout));
}
else
{
fprintf(stdout, "*** WARNING: could not open %s for redirection: %s.\n",
stdOutFile, strerror(errno));
}
}
if (stdErrFile && stdErrFile[0])
{
fdErr = open(stdErrFile,
O_WRONLY | O_APPEND | O_CREAT | O_SYNC,
S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
if (fdErr >= 0)
{
fprintf(stdout, "[Redirecting MXUDR stderr to %s]\n", stdErrFile);
fflush(stdout);
dup2(fdErr, fileno(stderr));
}
else
{
fprintf(stdout, "*** WARNING: could not open %s for redirection: %s.\n",
stdErrFile, strerror(errno));
}
}
runServer(argc, argv);
if (fdOut >= 0)
{
close(fdOut);
}
if (fdErr >= 0)
{
close(fdErr);
}
return 0;
}
static void runServer(Int32 argc, char **argv)
{
#ifdef _DEBUG
UDR_DEBUG1("Process ID: %ld", (Lng32) GETPID());
UDR_DEBUG0("[BEGIN argv]");
Int32 i;
for (i = 0; i < argc; i++)
{
if (argv[i])
UDR_DEBUG1(" '%s'", argv[i]);
else
UDR_DEBUG0(" NULL");
}
UDR_DEBUG0("[END argv]");
DumpProcessInfo();
#endif
if (setjmp(ExportJmpBuf))
{
UDR_ASSERT(0, "An ExportJmpBuf longjmp occurred.");
}
ExportJmpBufPtr = &ExportJmpBuf;
const char *moduleName = "runServer";
NABoolean showMsgBox = FALSE;
static Int32 pid = getpid();
char stmp[255];
sprintf(stmp,"Process Launched %d", pid);
if (getenv("SQL_MSGBOX_PROCESS") != NULL)
{
MessageBox(NULL, stmp , "tdm_udrserv", MB_OK|MB_ICONINFORMATION);
showMsgBox = TRUE;
}
#ifdef _DEBUG
putenv((char *)"MXUDR_DEBUG_BUILD=1");
#endif
#ifdef UDR_OSS_DEBUG
// enable more strict heap checking for the debug build, so
// that we catch more bugs.
if (getenv("DBG_HEAP_CHECK") != NULL)
heap_check_always(1);
#endif
#ifndef NDEBUG
if (getenv("DEBUG_SERVER") != NULL)
{
DebugBreak();
}
#endif // NDEBUG
UdrTraceFile = stdout;
// We now create two heaps. One is the "IPC" heap and is intended
// for objects that only persist while we are processing the current
// request. The other is the "UDR" heap and is intended for global
// objects or anything that must not go away after we stop
// processing the current request.
NAHeap *udrHeap = new NAHeap("UDR Global Heap",
NAMemory::DERIVED_FROM_SYS_HEAP,
256 * 1024 // 256K block size
);
NAHeap *ipcHeap = new NAHeap("UDR IPC Heap",
NAMemory::DERIVED_FROM_SYS_HEAP,
256 * 1024 // 256K block size
);
#ifdef UDR_DEBUG
if (getenv("SQLMX_UDR_MEMORY_LOG"))
HeapLogRoot::control(LOG_START);
#endif
UDR_GLOBALS = new (udrHeap) UdrGlobals(udrHeap, ipcHeap);
// Move environment settings into the global LmJavaOptions object
InitializeJavaOptionsFromEnvironment(*(UDR_GLOBALS->getJavaOptions()),
udrHeap);
if (processCmdLine(UDR_GLOBALS, argc, argv))
{
return;
}
GuaReceiveControlConnection *ctrlConn = UDR_GLOBALS->getControlConnection();
IpcEnvironment *env = UDR_GLOBALS->getIpcEnvironment();
// Wait for the first open message
ctrlConn->waitForMaster();
doMessageBox(UDR_GLOBALS, TRACE_SHOW_DIALOGS,
UDR_GLOBALS->showMain_, "GotConnection");
NABoolean doTrace = (UDR_GLOBALS->verbose_ &&
UDR_GLOBALS->traceLevel_ >= TRACE_IPMS &&
UDR_GLOBALS->showMain_) ? TRUE : FALSE;
while (1)
{
if (env->getAllConnections()->entries() > 0)
{
NAList<UdrServerReplyStream *> &replyStreams =
UDR_GLOBALS->getReplyStreams();
while(replyStreams.entries())
{
UdrServerReplyStream *stream = replyStreams[0];
replyStreams.removeAt(0);
#ifdef _DEBUG
Lng32 crashPoint = 0;
#endif
if (stream->moreObjects())
{
UdrIpcObjectType msgType =
(UdrIpcObjectType) stream->getNextObjType();
#ifdef _DEBUG
// Bring the process down if an environment variable value
// matches the message type
const char *crashString = getenv("MXUDR_CRASH_POINT");
if (crashString && crashString[0])
crashPoint = atol(crashString);
else
crashPoint = 0;
if (crashPoint == (Lng32) msgType)
{
ServerDebug(" CRASH POINT is %ld (%s)", crashPoint,
GetUdrIpcTypeString((UdrIpcObjectType) crashPoint));
ServerDebug(" MXUDR about to exit");
exit(1);
}
#endif
// Do some real work on the incoming message
processARequest(UDR_GLOBALS, *stream, *env);
#ifdef _DEBUG
// Bring the process down if the environment variable checked
// earlier has a negative value and its positive value matches
// the message type
if (crashPoint && (-crashPoint == (Lng32) msgType))
{
if (crashPoint == -UDR_MSG_DATA_HEADER ||
crashPoint == -UDR_MSG_CONTINUE_REQUEST ||
crashPoint == -UDR_MSG_RS_DATA_HEADER ||
crashPoint == -UDR_MSG_RS_CONTINUE)
{
// For data requests, let's wait for 5 sec before crashing
Sleep(5 * 1000); // 5 seconds in millisec
}
ServerDebug(" CRASH POINT is %ld (%s)", crashPoint,
GetUdrIpcTypeString((UdrIpcObjectType) -crashPoint));
ServerDebug(" MXUDR about to exit");
exit(1);
}
#endif
}
// All messages in the stream are processed
stream->addToCompletedList();
}
// All work is completed, now wait on all connections.
if (doTrace)
ServerDebug("[UdrServ (%s)] Wait for next message\n", moduleName);
env->deleteCompletedMessages();
// This change is made because QCD6 changes with Gil's code
// checked in 9/05/26, all UDR tests were failing.
// The UDR server was crashing in waitOnSet() in IPc.cpp file
// when it was making system call WAIT.
// So this is the work around for time being. Is is required
// to change other places after consulting Gil when he
// comes back from vacation.
// env->getAllConnections()->waitOnAll(IpcInfiniteTimeout);
ctrlConn->wait(IpcInfiniteTimeout);
} // if (env->getAllConnections()->entries() > 0)
} // while (1)
} // runServer
void processARequest(UdrGlobals *UdrGlob,
UdrServerReplyStream &msgStream,
IpcEnvironment &env)
{
const char *moduleName = "processARequest";
IpcMessageType msgType;
IpcMessageObjVersion msgVer;
doMessageBox(UdrGlob, TRACE_SHOW_DIALOGS,
UdrGlob->showMain_, moduleName);
UdrGlob->objectCount_++;
UdrGlob->numReqSP_++;
UdrGlob->numReqUDR_++;
UdrGlob->requestCount_++;
msgType = msgStream.getNextObjType();
msgVer = msgStream.getNextObjVersion();
UdrGlob->currentMsgSize_ = msgStream.getNextObjSize();
if (UdrGlob->verbose_ &&
UdrGlob->traceLevel_ >= TRACE_IPMS &&
UdrGlob->showMain_)
{
ServerDebug("[UdrServ (%s)] Processing Request# %ld",
moduleName,
UdrGlob->objectCount_);
ServerDebug(" Msg Type: %ld (%s)",
(Lng32) msgType,
GetUdrIpcTypeString(UdrIpcObjectType(msgType)));
ServerDebug(" Msg Version: %ld", (Lng32) msgVer);
}
if (!IsNAStringSpaceOrEmpty(initErrText))
{
if (msgType == UDR_MSG_LOAD)
{
controlErrorReply(UdrGlob,
msgStream,
UDR_ERR_MESSAGE_PROCESSING, //errorNumber
0, //intErrorInfo,
initErrText.data() //charErrorInfo
);
return;
}
}
//
// Each case in the following switch statement is for a different
// message type.
//
// For control messages we do the following:
// - Some bookkeeping in the UdrGlobals object
// - Extract the incoming message from the message stream
// - Call a global function to process the request and return a reply
//
// For data messages our work is different. We have to route the
// incoming message to the appropriate buffered stream. The routing
// involves using the UdrHandle from the incoming message to perform
// a lookup into the global SPInfo list.
//
switch (msgType)
{
case UDR_MSG_SESSION:
{
doMessageBox(UdrGlob, TRACE_SHOW_DIALOGS,
UdrGlob->showMain_, "Session Type");
UdrSessionMsg &request = *(new (UdrGlob->getIpcHeap())
UdrSessionMsg(UdrGlob->getIpcHeap()));
msgStream >> request;
processASessionMessage(UdrGlob, msgStream, request);
request.decrRefCount();
}
break;
case UDR_MSG_LOAD:
{
UdrGlob->numReqLoadSP_++;
doMessageBox(UdrGlob, TRACE_SHOW_DIALOGS,
UdrGlob->showMain_, "Load Type");
UdrLoadMsg &request = *(new (UdrGlob->getIpcHeap())
UdrLoadMsg(UdrGlob->getIpcHeap()));
msgStream >> request;
processALoadMessage(UdrGlob, msgStream, request, env);
request.decrRefCount();
}
break;
case UDR_MSG_UNLOAD:
{
UdrGlob->numReqUnloadSP_++;
doMessageBox(UdrGlob, TRACE_SHOW_DIALOGS,
UdrGlob->showMain_, "Unload Type");
UdrUnloadMsg &request = *(new (UdrGlob->getIpcHeap())
UdrUnloadMsg(UdrGlob->getIpcHeap()));
msgStream >> request;
processAnUnLoadMessage(UdrGlob, msgStream, request);
request.decrRefCount();
#ifdef UDR_DEBUG
if (getenv("SQLMX_UDR_MEMORY_LOG"))
{
printf("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n");
printf("Memory usage after processing request: %s \n",
GetUdrIpcTypeString(UdrIpcObjectType(msgType)));
printf("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n");
HeapLogRoot::display(FALSE);
}
#endif
}
break;
case UDR_MSG_DATA_HEADER:
{
UdrGlob->numReqDataSP_++;
doMessageBox(UdrGlob, TRACE_SHOW_DIALOGS,
UdrGlob->showMain_, "Invoke Type");
UdrDataHeader h(INVALID_UDR_HANDLE, NULL);
msgStream >> h;
SPInfo *sp = UdrGlob->getSPList()->spFind(h.getHandle());
if (sp)
{
UdrServerDataStream *other = sp->getDataStream();
msgStream.routeMessage(*other);
}
else
{
UdrGlob->numErrDataSP_++;
controlErrorReply(UdrGlob,
msgStream,
UDR_ERR_MISSING_UDRHANDLE,
0,
"Data header"
);
}
} // case UDR_MSG_DATA_HEADER
break;
case UDR_MSG_TMUDF_DATA_HEADER:
{
UdrGlob->numReqDataSP_++;
doMessageBox(UdrGlob, TRACE_SHOW_DIALOGS,
UdrGlob->showMain_, "TMUDF Request");
UdrTmudfDataHeader h(INVALID_UDR_HANDLE, NULL);
msgStream >> h;
SPInfo *sp = UdrGlob->getSPList()->spFind(h.getHandle());
if (sp)
{
UdrServerDataStream *other = sp->getDataStream();
msgStream.routeMessage(*other);
}
else
{
UdrGlob->numErrRSFetch_++;
controlErrorReply(UdrGlob,
msgStream,
UDR_ERR_MISSING_UDRHANDLE,
0,
"TMUDF Data header"
);
}
} // case UDR_MSG_TMUDF_DATA_HEADER
break;
case UDR_MSG_CONTINUE_REQUEST:
{
UdrGlob->numReqContinueSP_++;
doMessageBox(UdrGlob, TRACE_SHOW_DIALOGS,
UdrGlob->showMain_, "Continue Type");
UdrContinueMsg r(INVALID_UDR_HANDLE, NULL);
msgStream >> r;
SPInfo *sp = UdrGlob->getSPList()->spFind(r.getHandle());
if (sp)
{
UdrServerDataStream *other = sp->getDataStream();
msgStream.routeMessage(*other);
}
else
{
UdrGlob->numErrContinueSP_++;
controlErrorReply (UdrGlob,
msgStream,
UDR_ERR_MISSING_UDRHANDLE,
0,
"Continue request"
);
}
} // case UDR_MSG_CONTINUE_REQUEST
break;
case UDR_MSG_RS_LOAD:
{
UdrGlob->numReqRSLoad_++;
doMessageBox(UdrGlob, TRACE_SHOW_DIALOGS,
UdrGlob->showMain_, "RS Load Type");
UdrRSLoadMsg &request = *(new (UdrGlob->getIpcHeap())
UdrRSLoadMsg(UdrGlob->getIpcHeap()));
msgStream >> request;
processAnRSLoadMessage(UdrGlob, msgStream, request);
request.decrRefCount();
}
break;
case UDR_MSG_RS_DATA_HEADER:
{
UdrGlob->numReqRSFetch_++;
doMessageBox(UdrGlob, TRACE_SHOW_DIALOGS,
UdrGlob->showMain_, "RS Fetch Type");
UdrRSDataHeader h(INVALID_UDR_HANDLE, NULL);
msgStream >> h;
SPInfo *sp = UdrGlob->getSPList()->spFind(h.getHandle());
if (sp)
{
UdrServerDataStream *other = sp->getDataStream();
msgStream.routeMessage(*other);
}
else
{
UdrGlob->numErrRSFetch_++;
controlErrorReply(UdrGlob,
msgStream,
UDR_ERR_MISSING_UDRHANDLE,
0,
"RS Data header"
);
}
} // case UDR_MSG_RS_DATA_HEADER
break;
case UDR_MSG_RS_CONTINUE:
{
UdrGlob->numReqRSContinue_++;
doMessageBox(UdrGlob, TRACE_SHOW_DIALOGS,
UdrGlob->showMain_, "RS Continue Type");
UdrRSContinueMsg &request = *(new (UdrGlob->getIpcHeap())
UdrRSContinueMsg(UdrGlob->getIpcHeap()));
msgStream >> request;
SPInfo *sp = UdrGlob->getSPList()->spFind(request.getHandle());
if (sp)
{
UdrServerDataStream *other = sp->getDataStream();
msgStream.routeMessage(* other);
}
else
{
UdrGlob->numErrRSContinue_++;
controlErrorReply(UdrGlob,
msgStream,
UDR_ERR_MISSING_UDRHANDLE,
0,
"RS Continue"
);
}
request.decrRefCount();
}
break;
case UDR_MSG_RS_CLOSE:
{
UdrGlob->numReqRSClose_++;
doMessageBox(UdrGlob, TRACE_SHOW_DIALOGS,
UdrGlob->showMain_, "RS Close Type");
UdrRSCloseMsg &request = *(new (UdrGlob->getIpcHeap())
UdrRSCloseMsg(UdrGlob->getIpcHeap()));
msgStream >> request;
processAnRSCloseMessage(UdrGlob, msgStream, request);
request.decrRefCount();
}
break;
case UDR_MSG_RS_UNLOAD:
{
UdrGlob->numReqRSUnload_++;
doMessageBox(UdrGlob, TRACE_SHOW_DIALOGS,
UdrGlob->showMain_, "RS Unload Type");
UdrRSUnloadMsg &request = *(new (UdrGlob->getIpcHeap())
UdrRSUnloadMsg(UdrGlob->getIpcHeap()));
msgStream >> request;
processAnRSUnloadMessage(UdrGlob, msgStream, request);
request.decrRefCount();
}
break;
case UDR_MSG_ENTER_TX:
{
doMessageBox(UdrGlob, TRACE_SHOW_DIALOGS,
UdrGlob->showMain_, "ENTER TX Type");
UdrEnterTxMsg &request = *(new (UdrGlob->getIpcHeap())
UdrEnterTxMsg(UdrGlob->getIpcHeap()));
msgStream >> request;
processAnEnterTxMessage(UdrGlob, msgStream, request);
request.decrRefCount();
} // case UDR_MSG_ENTER_TX
break;
case UDR_MSG_SUSPEND_TX:
{
doMessageBox(UdrGlob, TRACE_SHOW_DIALOGS,
UdrGlob->showMain_, "SUSPEND TX Type");
UdrSuspendTxMsg &request = *(new (UdrGlob->getIpcHeap())
UdrSuspendTxMsg(UdrGlob->getIpcHeap()));
msgStream >> request;
processASuspendTxMessage(UdrGlob, msgStream, request);
request.decrRefCount();
} // case UDR_MSG_SUSPEND_TX
break;
case UDR_MSG_EXIT_TX:
{
doMessageBox(UdrGlob, TRACE_SHOW_DIALOGS,
UdrGlob->showMain_, "EXIT TX Type");
UdrExitTxMsg &request = *(new (UdrGlob->getIpcHeap())
UdrExitTxMsg(UdrGlob->getIpcHeap()));
msgStream >> request;
processAnExitTxMessage(UdrGlob, msgStream, request);
request.decrRefCount();
} // case UDR_MSG_EXIT_TX
break;
default:
{
UdrGlob->numErrUDR_++;
controlErrorReply(UdrGlob,
msgStream,
UDR_ERR_UNKNOWN_MSG_TYPE,
(Lng32) msgType,
NULL);
} // default
break;
} // switch (msgType)
// reset currSP_
UdrGlob->setCurrSP (NULL);
} // processARequest()
// If param 'sp' is set, we check if we have to quiesce executor.
// If it's NULL, we do not do any attempt to quiesce executor. So it's
// preferable to set it to NULL, if the caller knows quiesce is not needed.
void sendControlReply(UdrGlobals *UdrGlob,
UdrServerReplyStream &msgStream,
SPInfo *sp)
{
const char *moduleName = "sendControlReply";
doMessageBox(UdrGlob, TRACE_SHOW_DIALOGS,
UdrGlob->showMain_, moduleName);
NABoolean doTrace = (UdrGlob->verbose_ &&
UdrGlob->traceLevel_ >= TRACE_IPMS &&
UdrGlob->showMain_) ? TRUE : FALSE;
UdrGlob->replyCount_++;
if (doTrace)
ServerDebug("[UdrServ (%s)] Sending reply# %d to client",
moduleName,
UdrGlob->replyCount_);
if (sp)
{
sp->prepareToReply(msgStream);
}
else
{
// We are replying to a request that does not require an SPInfo.
// We assume that we are already in the correct transaction related
// to this request (and there is no problem even if we are in different
// transaction) and these requests do not need executor to be quiesced.
// Do nothing.
}
NABoolean waited = TRUE;
msgStream.send(waited);
if (doTrace)
{
CollIndex i = 0;
msgStream.getRecipients().setToNext(i);
IpcConnection *conn = msgStream.getRecipients().element(i);
ServerDebug(" Sent reply %ld to client on connection 0x%08X",
UdrGlob->replyCount_,
conn);
ServerDebug("[UdrServ (%s)] Done", moduleName);
}
} // sendControlReply()
// If param 'sp' is set, we check if we have to quiesce executor.
// If it's NULL, we do not do any attempt to quiesce executor. So it's
// preferable to set it to NULL, if the caller knows quiesce is not needed.
void sendDataReply(UdrGlobals *UdrGlob,
UdrServerDataStream &msgStream,
SPInfo *sp)
{
const char *moduleName = "sendDataReply";
NABoolean doTrace = (UdrGlob->verbose_ &&
UdrGlob->traceLevel_ >= TRACE_IPMS &&
UdrGlob->showMain_) ? TRUE : FALSE;
if (UdrGlob->traceLevel_ >= TRACE_SHOW_DIALOGS && UdrGlob->showMain_)
{
MessageBox(NULL, moduleName , UdrGlob->serverName_,
MB_OK | MB_ICONINFORMATION);
}
UdrGlob->replyCount_++;
if (doTrace)
ServerDebug("[UdrServ (%s)] Sending reply# %d to client",
moduleName,
UdrGlob->replyCount_);
if (sp)
{
sp->prepareToReply(msgStream);
}
else
{
// We are replying to a request that does not require an SPInfo.
// We assume that we are already in the correct transaction related
// to this request (and there is no problem even if we are in different
// transaction) and these requests do not need executor to be quiesced.
// Do nothing.
}
msgStream.sendResponse();
// We used to have the following code. But after having many
// connections in UDR server, the following check may not be
// correct. Also, there is no easy way of getting connection
// pointer from buffered stream. Btw, this may not be necessary.
// The regressions are working fine without this 'wait' loop.
// One day we can remove this completely once we are sure that
// we are not breaking nowait apps. If we have problems with nowait
// apps, we will have to fix this.
//
// With buffered data streams we need to make sure all
// response buffers have been sent before returning from this
// function. There is no "waited" send method on the buffered
// stream class so we implement the "wait" here.
//
// IpcConnection *conn = env.getControlConnection()->getConnection();
// while (conn && conn->sendIOPending())
// {
// conn->wait(IpcInfiniteTimeout);
// }
if (doTrace)
{
ServerDebug(" Sent reply %ld to client", UdrGlob->replyCount_);
ServerDebug("[UdrServ (%s)] Done", moduleName);
}
} // sendDataReply()
static void displayUsageInfo()
{
fprintf(stdout, "Usage:\n");
fprintf(stdout, " mxudr -help\n");
fprintf(stdout, " Prints this help information.\n");
fprintf(stdout, "\n");
fprintf(stdout, " mxudr -verify\n");
fprintf(stdout, " Verify UDR Server and Language Manager startup.\n");
fprintf(stdout, "\n");
#ifdef _DEBUG
// The "-invoke" and "-obey" options are supported in the release
// build but we do not tell users about them in this help message.
fprintf(stdout, " mxudr -invoke [-n N] [-param java|c] [-rs numRS]\n");
fprintf(stdout, " [-noTx] [-vc] <method> <file> <path> [args]\n");
fprintf(stdout, " Invoke a method using the Language Manager\n");
fprintf(stdout, "\n");
fprintf(stdout, " -n N : N is number of times to invoke <method>.\n");
fprintf(stdout, " Default is 1.\n");
fprintf(stdout, " -param java|c : Parameter style. Default is Java.\n");
fprintf(stdout, " -rs numRS : Number of ResultSet parameters. For.\n");
fprintf(stdout, " Java only. Default is 0.\n");
fprintf(stdout, " -noTx : <method> is executed without a transaction.\n");
fprintf(stdout, " -vc : Parameters are considered VARCHAR.\n");
fprintf(stdout, " If not specified, parameters are CHAR.\n");
fprintf(stdout, " <method> : Name of Java method or C function.\n");
fprintf(stdout, " <file> : Name of Java class file or C DLL.\n");
fprintf(stdout, " <path> : Location of <file>.\n");
fprintf(stdout, " [args] : Arguments separated by spaces.\n");
fprintf(stdout, "\n");
fprintf(stdout, " mxudr -obey <filename>\n");
fprintf(stdout, " Processes commands from a text file <filename>\n");
fprintf(stdout, "\n");
#endif
fprintf(stdout, "\n");
}
NABoolean processCmdLine(UdrGlobals *UdrGlob, Int32 argc, char **argv)
{
const char *moduleName = "processCmdLine";
doMessageBox(UdrGlob, TRACE_SHOW_DIALOGS,
UdrGlob->showMain_, moduleName);
char tmp1[255];
if (!IsNAStringSpaceOrEmpty(initErrText) )
{
if (!stricmp(argv[1], "-obey") || !stricmp(argv[1], "-help")
|| !stricmp(argv[1], "-invoke") || !stricmp(argv[1], "-verify") )
{
ComDiagsArea *initDiags = addOrCreateErrorDiags(UdrGlob,
UDR_ERR_MESSAGE_PROCESSING,
0,
initErrText.data(),
NULL);
DumpDiags(cout, initDiags, MXUDR_PREFIX_PLUS_SPACE);
return TRUE;
}
}
//
// Process environmental variables...
//
UdrGlob->verbose_ = FALSE;
UdrGlob->traceLevel_ = 0;
UdrGlob->logFileProvided_ = FALSE;
// NSK or Linux can use environment variables...
if (const char *logFileName = getenv("UDRSERV_TRACE_FILENAME"))
{
UdrTraceFile = fopen(logFileName, "a");
if (UdrTraceFile == NULL)
{
UdrTraceFile = stdout;
sprintf(tmp1, " Log File: Std Out - Rejected: %.60s", logFileName);
}
else
{
UdrGlob->logFileProvided_ = TRUE;
sprintf(tmp1, " Log File: %.60s", logFileName);
}
}
else
{
UdrTraceFile = stdout;
sprintf(tmp1, "\tLog File: Std Out");
}
if (UdrGlob->verbose_)
{
ServerDebug("[UdrServ (%.30s)] Runtime Parameters:", moduleName);
ServerDebug(tmp1);
ServerDebug(" Trace Level %d", UdrGlob->traceLevel_);
ServerDebug(" Verbose Mode Active");
}
if (UdrGlob->showInvoke_)
ServerDebug(" Trace Invoke Module Active");
if (UdrGlob->showLoad_)
ServerDebug(" Trace Load Module Active");
if (UdrGlob->showMain_)
ServerDebug(" Trace Main Module Active");
if (UdrGlob->showSPInfo_)
ServerDebug(" Trace SPInfo Module Active");
if (UdrGlob->showUnload_)
ServerDebug(" Trace Unload Module Active");
//
// Now we parse command-line arguments. We support the following
// command-line operations:
//
// -help Display a usage message
//
// -verify Load the Language Manager and exit
//
// -invoke Call a UDR method via the Language Manager
//
// -obey Process commands from a text file
//
for (Int32 i = 1; i < argc; i++)
{
if (!stricmp(argv[i], "-help"))
{
UdrGlob->setCommandLineMode(TRUE);
displayUsageInfo();
exit(1);
}
else if (!stricmp(argv[i], "-verify"))
{
UdrGlob->setCommandLineMode(TRUE);
verifyUdrServer(*UdrGlob);
exit(0);
}
// See if an LM method invocation is being requested. Syntax for
// LM method invocations is shown in the displayUsageInfo()
// function.
else if (!stricmp(argv[i], "-invoke"))
{
UdrGlob->setCommandLineMode(TRUE);
Int32 nTimesToInvoke = 1;
Int32 nResultSets = 0;
NABoolean usageError = FALSE;
const char *paramStyle = "java";
NABoolean txRequired = TRUE;
NABoolean isUdf = FALSE;
NABoolean useVarchar = FALSE;
// This while loop scans for invoke options that begin with a
// dash.
while (!usageError && ((i + 1) < argc))
{
if (argv[i + 1][0] == '-')
{
i++;
if (!stricmp(argv[i], "-n"))
{
if ((i + 1) < argc)
{
i++;
nTimesToInvoke = atoi(argv[i]);
}
else
{
usageError = TRUE;
}
}
else if (!stricmp(argv[i], "-rs"))
{
if ((i + 1) < argc)
{
i++;
nResultSets = atoi(argv[i]);
}
else
{
usageError = TRUE;
}
}
else if (!stricmp(argv[i], "-param"))
{
if ((i+1) < argc)
{
i++;
paramStyle = argv[i];
}
else
{
usageError = TRUE;
}
}
else if (!stricmp(argv[i], "-notx"))
{
txRequired = FALSE;
}
else if (!stricmp(argv[i], "-vc"))
{
useVarchar = TRUE;
}
else if (!stricmp(argv[i], "-udf"))
{
isUdf = TRUE;
}
else
{
usageError = TRUE;
}
}
else
{
break;
}
} // while (!usageError && ((i + 1) < argc))
// We must have at least 3 more arguments for method, container,
// and path
if (argc <= (i + 3))
{
usageError = TRUE;
}
// We expect param styles "c" and "java"
if (stricmp(paramStyle, "c") == 0)
{
// Do not manage a transaction for C routines
txRequired = FALSE;
}
else if (stricmp(paramStyle, "java") == 0)
{
// Nothing to do
}
else
{
usageError = TRUE;
}
if (usageError)
{
displayUsageInfo();
exit(1);
}
const char *method = argv[++i];
const char *container = argv[++i];
const char *path = argv[++i];
Int32 methodArgc = argc - (i + 1);
char **methodArgv = NULL;
if (methodArgc > 0)
{
methodArgv = &(argv[i + 1]);
}
NABoolean isJava = (stricmp(paramStyle, "java") == 0 ? TRUE : FALSE);
// If the user is invoking a C routine, assume it is a UDF not a
// stored procedure
if (!isJava)
isUdf = TRUE;
Int32 result = invokeUdrMethod(method, container, path,
isJava, isUdf, txRequired, useVarchar,
methodArgc, methodArgv, nResultSets,
nTimesToInvoke, *UdrGlob);
exit(result);
} // -invoke
// See if the user wants us to obey commands from a text file
else if (!stricmp(argv[i], "-obey"))
{
// Dead Code
// obey option is not tested, and Andy thinks it should be obsoleted.
UdrGlob->setCommandLineMode(TRUE);
NABoolean usageError = FALSE;
if ((i + 1) >= argc)
{
usageError = TRUE;
}
if (usageError)
{
displayUsageInfo();
exit(1);
}
i++;
Int32 result = processCommandsFromFile(argv[i], *UdrGlob);
exit(result);
} // -obey
} // for each arg in argv
return FALSE;
} // processCmdLine
#if 0
//
// $$$$ getClientId() is disabled for now. It is returning errors
// after certain system messages arrive on $RECEIVE. It needs to be
// redesigned. When it gets redesigned, it's not clear whether a
// certain technique is valid. The UDR server layer (i.e. this source
// file) calls receive() on an IpcMessageStream object and following
// receive() the UDR server layer calls FILE_GETRECEIVEINFO_ to see
// which process sent the last request. I don't think the UDR server
// layer should be querying Guardian for info about $RECEIVE. The IPC
// layer is managing $RECEIVE and the UDR server probably should't
// assume that the next message being provided by the IPC layer is the
// most recent arrival on $RECEIVE. The system message problem is one
// example. The IPC layer can have data to present to the UDR server
// layer but that data is not the most recent $RECEIVE arrival. The
// most recent $RECEIVE arrival might have been a system message that
// is of no concern to the UDR server, e.g. a remote CPU went down.
//
Lng32 getClientId(UdrGlobals *UdrGlob)
{
// Returns either 0 OK or UDR_ERR_TOO_MANY_OPENERS
const char *moduleName = "getClientId";
GuaErrorNumber guaErrorInfo;
GuaProcessHandle openPhandle;
doMessageBox(UdrGlob, TRACE_SHOW_DIALOGS,
UdrGlob->showMain_, moduleName);
// get current clients process ID
guaErrorInfo = FILE_GETRECEIVEINFO_((short *) &receiveInfo);
if (guaErrorInfo != GuaOK)
{
UDR_ASSERT(0, "Fatal error in FILE_GETRECEIVEINFO_");
}
openPhandle = receiveInfo.phandle_;
short retcode = GETTRANSID(UdrGlob->transId_);
// Locate ID in openerProcessId list
for (Int32 i=0; i<UdrGlob->gNumOpeners_; i++)
{
if (openPhandle == UdrGlob->gOpenerPhandle_[i])
{
return 0;
}
}
// Not found, add to list
if (UdrGlob->gNumOpeners_ < UDRMAXOPENERS_V100)
{
// Add to list
UdrGlob->gOpenerPhandle_[UdrGlob->gNumOpeners_++] = openPhandle;
return 0;
}
// List is full
return UDR_ERR_TOO_MANY_OPENERS;
} // getClientId
#endif
//
// Helper functions to return a UdrErrorReply object followed by a SQL
// diagnostics area on a UDR stream. The diags area will contain a
// single condition. The intErrorInfo and charErrorInfo parameters are
// optional information items for a particular condition. For example,
// the "invalid handle" condition requires one additional string item
// so the caller of this function should pass a valid charErrorInfo
// pointer when returning that condition.
//
// Code outside this file will use the global functions
// controlErrorReply() and dataErrorReply() to send UdrErrorReply
// objects.
//
static ComDiagsArea *addOrCreateErrorDiags(UdrGlobals *UdrGlob,
Lng32 errorNumber,
Lng32 intErrorInfo,
const char *charErrorInfo,
ComDiagsArea *diags)
{
if (!diags)
{
diags = ComDiagsArea::allocate(UdrGlob->getIpcHeap());
}
if (diags && errorNumber != 0)
{
switch (errorNumber)
{
case UDR_ERR_UNKNOWN_MSG_TYPE:
case UDR_ERR_TOO_MANY_OPENERS:
*diags << DgSqlCode(-errorNumber);
*diags << DgInt0((Int32)intErrorInfo);
break;
case UDR_ERR_MISSING_UDRHANDLE:
case UDR_ERR_MISSING_LMROUTINE:
case UDR_ERR_MISSING_RSHANDLE:
*diags << DgSqlCode(-errorNumber);
if (charErrorInfo)
{
*diags << DgString0(charErrorInfo);
}
break;
case UDR_ERR_INVALID_RS_INDEX:
*diags << DgSqlCode(-errorNumber)
<< DgString0(charErrorInfo)
<< DgInt0((Int32) intErrorInfo);
break;
case UDR_ERR_INVALID_RS_STATE:
*diags << DgSqlCode(-errorNumber);
*diags << DgString0(charErrorInfo);
break;
default:
{
*diags << DgSqlCode(-UDR_ERR_MESSAGE_PROCESSING);
switch (intErrorInfo)
{
case INVOKE_ERR_NO_REQUEST_BUFFER:
*diags << DgString0("Invoke: no request buffer.");
break;
case INVOKE_ERR_NO_INPUT_ROW:
*diags << DgString0("Invoke: no input row.");
break;
case INVOKE_ERR_NO_REPLY_DATA_BUFFER:
*diags << DgString0("Invoke: allocate reply buffer failed.");
break;
case INVOKE_ERR_NO_REPLY_ROW:
*diags << DgString0("Invoke: allocate reply row failed.");
break;
case INVOKE_ERR_NO_ERROR_ROW:
*diags << DgString0("Invoke: allocate error row failed.");
break;
case INVOKE_ERR_NO_REPLY_BUFFER:
*diags << DgString0("Allocate reply buffer failed.");
break;
case RS_ERR_NO_REPLY_MSG:
*diags << DgString0("Allocate RS reply failed.");
break;
case RS_ERR_NO_REPLY_DATA_BUFFER:
*diags << DgString0("Allocate RS reply data buffer failed.");
break;
default:
if (charErrorInfo)
*diags << DgString0(charErrorInfo);
else
*diags << DgString0("");
break;
} // switch (intErrorInfo)
} // default
break;
} // switch (errorNumber)
} // if (diags)
return diags;
}
static void doErrorDebugging(UdrGlobals *UdrGlob,
const char *moduleName,
Lng32 errorNumber, Lng32 intErrorInfo,
const char *charErrorInfo)
{
/*
char errorText[MAXERRTEXT];
sprintf(errorText, "(%.30s) UDR Error Reply: %d, %d",
moduleName, errorNumber, intErrorInfo);
ServerDebug("[UdrServ (%s)] %s\n", moduleName, errorText);
doMessageBox(UdrGlob, TRACE_SHOW_DIALOGS,
UdrGlob->showMain_, errorText);
*/
if (UdrGlob->verbose_ &&
UdrGlob->traceLevel_ >= TRACE_IPMS &&
UdrGlob->showMain_)
{
ServerDebug("[UdrServ (%s)] send error reply", moduleName);
}
}
void controlErrorReply(UdrGlobals *UdrGlob,
UdrServerReplyStream &msgStream,
Lng32 errorNumber,
Lng32 intErrorInfo,
const char *charErrorInfo)
{
const char *moduleName = "controlErrorReply";
doMessageBox(UdrGlob, TRACE_SHOW_DIALOGS, UdrGlob->showMain_, moduleName);
UdrErrorReply *reply =
new (UdrGlob->getIpcHeap()) UdrErrorReply(UdrGlob->getIpcHeap());
msgStream.clearAllObjects();
msgStream << *reply;
reply->decrRefCount();
ComDiagsArea *diags = addOrCreateErrorDiags(UdrGlob,
errorNumber,
intErrorInfo,
charErrorInfo,
NULL);
if (diags)
{
msgStream << *diags;
diags->decrRefCount();
}
doErrorDebugging(UdrGlob, moduleName,
errorNumber, intErrorInfo, charErrorInfo);
sendControlReply(UdrGlob, msgStream, NULL);
UdrGlob->numErrSP_++;
} // controlErrorReply
void dataErrorReply(UdrGlobals *UdrGlob,
UdrServerDataStream &msgStream,
Lng32 errorNumber,
Lng32 intErrorInfo,
const char *charErrorInfo,
ComDiagsArea *diags)
{
const char *moduleName = "dataErrorReply";
doMessageBox(UdrGlob, TRACE_SHOW_DIALOGS, UdrGlob->showMain_, moduleName);
// Free up any receive buffers no longer in use
msgStream.cleanupBuffers();
UdrErrorReply *reply =
new (UdrGlob->getIpcHeap()) UdrErrorReply(UdrGlob->getIpcHeap());
msgStream << *reply;
reply->decrRefCount();
if (!diags)
diags = addOrCreateErrorDiags(UdrGlob,
errorNumber,
intErrorInfo,
charErrorInfo,
NULL);
if (diags)
{
msgStream << *diags;
diags->decrRefCount();
}
doErrorDebugging(UdrGlob, moduleName,
errorNumber, intErrorInfo, charErrorInfo);
UdrGlob->numErrUDR_++;
UdrGlob->numErrSP_++;
UdrGlob->numErrInvokeSP_++;
// UDR_ASSERT(0, "UDR Data Error Reply");
} // dataErrorReply
static void verifyUdrServer(UdrGlobals &glob){
LmResult result = LM_OK;
NAMemory *h = glob.getUdrHeap();
//
// Create a diags area
//
ComDiagsArea *diags = ComDiagsArea::allocate(h);
// Create an LM instance
LmLanguageManager *lm = glob.getOrCreateLM(result,
COM_LANGUAGE_JAVA,
diags);
if (!lm && result == LM_OK)
{
result = LM_ERR;
}
// Destroy the LM and the JVM. This will flush any JVM output
// buffers such as those containing data from the -verbose or
// -Xrunhprof options.
glob.destroyJavaLMAndJVM();
fflush(stderr);
fflush(stdout);
if (result == LM_OK)
{
fprintf(stdout, "MXUDR Status: OK!\n");
}
else
{
fprintf(stdout, "MXUDR Status: Errors were encountered!\n");
fprintf(stdout, "Language Manager constructor returned %s\n",
LmResultToString(result));
DumpDiags(cout, diags, "");
}
}
static const char *LmResultToString(const LmResult &r)
{
switch (r)
{
case LM_OK: return "LM_OK";
case LM_ERR: return "LM_ERR";
case LM_CONT_NO_READ_ACCESS: return "LM_CONT_NO_READ_ACCESS";
case LM_CONV_REQUIRED: return "LM_CONV_REQUIRED";
case LM_CONV_ERROR: return "LM_CONV_ERROR";
case LM_PARAM_OVERFLOW: return "LM_PARAM_OVERFLOW";
default: return ComRtGetUnknownString((Int32) r);
}
}
static void DumpDiags(ostream &stream, ComDiagsArea *d, const char *prefix)
{
if (d && d->getNumber() > 0)
{
stream << prefix << "BEGIN SQL diagnostics" << endl;
NADumpDiags(stream, // ostream &
d, // ComDiagsArea *
TRUE, // add newline after each diagnostic?
FALSE, // to avoid "--" comment prefix
NULL, // FILE *
1 // for verbose output
);
stream << prefix << "END SQL diagnostics" << endl;
}
}
// Function to invoke a routine body using the Language Manager (LM)
// API. This function supports command-line invocations and is never
// called when MXUDR runs as a server.
//
// Assumptions
// * the number of arguments to the routine is the number of strings
// passed on the command-line
// * all arguments are character strings
// * For Java
// (1) the argument type must be String[]
// (2) All arguments are considered INOUT params, except as given in (3)
// (3) for 'main' method the arguments are considered to be IN params
//
static Int32 invokeUdrMethod(const char *method,
const char *container,
const char *path,
NABoolean isJava,
NABoolean isUdf,
NABoolean txRequired,
NABoolean useVarchar,
Int32 argc,
char *argv[],
Int32 nResultSets,
Int32 nTimesToInvoke,
UdrGlobals &glob)
{
LmResult result = LM_OK;
FILE *f = MXUDR_OUTFILE;
const char *prefix = MXUDR_PREFIX;
const char *prefixPlusSpace = MXUDR_PREFIX_PLUS_SPACE;
NAMemory *h = glob.getUdrHeap();
Int32 i;
Lng32 cliResult = 0;
fprintf(f, "%s Preparing to invoke a routine body\n", prefix);
fprintf(f, "%s Language %s, routine type %s\n", prefix,
(isJava ? "JAVA" : "C"), (isUdf ? "UDF" : "STORED PROCEDURE"));
fprintf(f, "%s Container: %s/%s\n", prefix, path, container);
fprintf(f, "%s Routine: %s\n", prefix, method);
fprintf(f, "%s The routine will be invoked %d time%s\n",
prefix, nTimesToInvoke, (nTimesToInvoke == 1 ? "" : "s"));
for (i = 0; i < argc; i++)
{
fprintf(f, "%s argv[%d] = \"%s\"\n", prefix, i, argv[i]);
}
fflush(f);
// Create a diags area
ComDiagsArea *diags = ComDiagsArea::allocate(h);
// Create an LM instance
fprintf(f, "%s About to create an LM instance\n", prefix);
fflush(f);
LmLanguageManager *lm =
glob.getOrCreateLM(result,
isJava ? COM_LANGUAGE_JAVA : COM_LANGUAGE_C,
diags);
if (!lm && result == LM_OK)
{
// This is not supposed to happen. We will treat it the same as
// LM_ERR.
result = LM_ERR;
}
fprintf(f, "%s LM constructor returned %s\n",
prefix, LmResultToString(result));
fflush(f);
DumpDiags(cout, diags, prefixPlusSpace);
// For Java we need to assemble the method signature. All parameters
// are considered INOUT strings and return type is void.
// One exception is parameters to 'main' method are considered
// IN strings and the signature contains only one String param.
char *sig = NULL;
if (isJava && result == LM_OK)
{
const char *stringSig = "[Ljava/lang/String;";
const char *rsSig = "[Ljava/sql/ResultSet;";
Int32 stringSigLen = str_len(stringSig);
Int32 rsSigLen = str_len(rsSig);
sig = new (h) char [(argc * stringSigLen) + (nResultSets * rsSigLen) + 20];
sig[0] = 0;
str_cat(sig, "(", sig);
if (strcmp(method, "main") == 0)
{
str_cat(sig, stringSig, sig);
}
else
{
for (i = 0; i < argc; i++)
str_cat(sig, stringSig, sig);
for (i = 0; i < nResultSets; i++)
str_cat(sig, rsSig, sig);
}
str_cat(sig, ")V", sig);
fprintf(f, "%s Java signature: %s\n", prefix, sig);
fflush(f);
}
// Allocate buffers for input and output below while setting up
// LmParameter structures.
char *inputRow = NULL, *outputRow = NULL;
// Prepare an LmParameter array. For UDFs we need one additional
// LmParameter for the return value. We assume the output value will
// be a character string of no more than 32 bytes.
ComUInt32 maxUdfOutLen = 32;
LmParameter *lmParams = NULL;
Int32 numLmParams = argc;
if (isUdf && result == LM_OK)
numLmParams++;
Int32 inputRowLen = 0;
Int32 outputRowLen = 0;
if (result == LM_OK)
{
// Find the input row length
for (i = 0; i < argc; i++)
{
// * 2 bytes null indicator always aligned on 8-byte boundary
// * 2 bytes for alignment
// * for VARCHAR values, 2 bytes for the length indicator
// * data
inputRowLen += (2 + 2 + (useVarchar ? 2 : 0) + str_len(argv[i]));
inputRowLen = ROUND8(inputRowLen);
}
// Find the output row length
if (isUdf)
{
// * 2 bytes null indicator
// * 2 bytes for alignment
// * for VARCHAR values, 2 bytes for the length indicator
// * data (with room for a null terminator)
outputRowLen = (2 + 2 + (useVarchar ? 2 : 0) + maxUdfOutLen + 1);
outputRowLen = ROUND8(outputRowLen);
}
else
{
// For SPJs, all the parameters are assumed to be INOUT. So,
// input and output row lengths are same
outputRowLen = inputRowLen;
}
// Assign memory for the input and output rows
inputRow = (char *) h->allocateMemory(inputRowLen);
memset(inputRow, 0, inputRowLen);
outputRow = (char *) h->allocateMemory(outputRowLen);
memset(outputRow, 0, outputRowLen);
if (numLmParams > 0)
{
lmParams = (LmParameter *)
h->allocateMemory(numLmParams * sizeof(LmParameter));
memset(lmParams, 0, numLmParams * sizeof(LmParameter));
}
ComFSDataType fsType (useVarchar ? COM_VCHAR_FSDT : COM_FCHAR_FSDT);
Int32 currParamOffset = 0;
for (i = 0; i < argc; i++)
{
currParamOffset = ROUND8(currParamOffset);
Int32 argLen = str_len(argv[i]);
// Figure out the type and varchar indicator offsets
Int32 vcIndOffset = 0, vcIndLen = 0;
UInt32 dataOffset = currParamOffset + 4;
if (useVarchar)
{
vcIndOffset = currParamOffset + 4;
vcIndLen = 2;
dataOffset += 2;
}
if (isUdf || strcmp(method, "main") == 0)
{
lmParams[i].init(fsType,
0, // precision
0, // scale,
CharInfo::ISO88591,
CharInfo::DefaultCollation,
COM_INPUT_COLUMN,
FALSE, // objMap
RS_NONE,
dataOffset, // IN offset
argLen, // len
currParamOffset, // null offset
2, // null len
vcIndOffset, // vc offset
vcIndLen, // vc len
0, // OUT offset
0, // len
0, // null offset
0, // null len
0, // vc offset
0, // vc len
NULL); // name
}
else
{
lmParams[i].init(fsType,
0, // precision
0, // scale,
CharInfo::ISO88591,
CharInfo::DefaultCollation,
COM_INOUT_COLUMN,
FALSE, // objMap
RS_NONE,
dataOffset, // IN offset
argLen, // len
currParamOffset, // null offset
2, // null len
vcIndOffset, // vc offset
vcIndLen, // vc len
dataOffset, // OUT offset
argLen, // len
currParamOffset, // null offset
2, // null len
vcIndOffset, // vc offset
vcIndLen, // vc len
NULL); // name
}
// copy the value to inputRow
str_cpy_all(inputRow + dataOffset, argv[i], argLen);
if (useVarchar)
*(short *)(inputRow + vcIndOffset) = argLen;
// Move currParamOffset beyond the current parameter
currParamOffset += 2; // null indicator
currParamOffset += 2; // padding for alignment
currParamOffset += (useVarchar ? 2 : 0); // VC len
currParamOffset += argLen; // data
currParamOffset = ROUND8(currParamOffset);
} // for each input param
if (isUdf)
{
// Add the trailing output param
lmParams[i].init(fsType,
0, // precision
0, // scale,
CharInfo::ISO88591,
CharInfo::DefaultCollation,
COM_OUTPUT_COLUMN,
FALSE, // objMap
RS_NONE,
0, // IN offset
0, // len
0, // null offset
0, // null len
0, // vc offset
0, // vc len
(useVarchar ? 6 : 4), // OUT offset
maxUdfOutLen, // len
0, // null offset
2, // null len
(useVarchar ? 2 : 0), // vc offset
(useVarchar ? 2 : 0), // vc len
NULL); // name
} // if (isUdf)
} // if (result == LM_OK)
LmRoutine *lmRoutine = NULL;
i = 0;
ComRoutineExternalSecurity externalSecurity =
COM_ROUTINE_EXTERNAL_SECURITY_INVOKER;
Int32 routineOwnerId = ROOT_USER_ID; //dbRoot
while (result == LM_OK && i++ < nTimesToInvoke)
{
// Begin a transaction
if (txRequired && result == LM_OK)
{
Int32 transtag;
Int32 tmfResult;
tmfResult = BEGINTRANSACTION((Int32 *)&transtag);
if (tmfResult != 0)
{
fprintf(f, "%s BEGINTRANSACTION() returned %d\n",
prefix, tmfResult);
fflush(f);
result = LM_ERR;
}
else
{
fprintf(f, "%s Started a transaction\n", prefix);
fflush(f);
}
}
if (i == 1)
{
// Call LM::getRoutine()
if (result == LM_OK)
{
result = lm->getRoutine(numLmParams,
lmParams,
0, //numTableInfo
NULL,//tableInfo
NULL, // return value
(isJava ? COM_STYLE_JAVA_CALL : COM_STYLE_SQL),
(txRequired ? COM_TRANSACTION_REQUIRED : COM_NO_TRANSACTION_REQUIRED),
COM_MODIFIES_SQL,
"", // parentQid
inputRowLen,
outputRowLen,
method, // sqlName
method, // externalName
sig,
container,
path,
container,
glob.getCurrentUserName(),
glob.getSessionUserName(),
externalSecurity,
routineOwnerId,
&lmRoutine,
NULL,
NULL,
nResultSets,
diags);
if (result != LM_OK)
{
fprintf(f, "%s getRoutine() returned %s\n",
prefix, LmResultToString(result));
fflush(f);
}
DumpDiags(cout, diags, prefixPlusSpace);
}
}
// Call LM::invokeRoutine()
if (result == LM_OK && lmRoutine)
{
fprintf(f, "\n%s About to invoke the routine body...\n", prefix);
fflush(f);
result = lm->invokeRoutine(lmRoutine, inputRow, outputRow, diags);
if (result != LM_OK)
{
fprintf(f, "%s invokeRoutine() returned %s\n",
prefix, LmResultToString(result));
fflush(f);
}
else
{
fprintf(f, "%s The routine body has returned\n", prefix);
if (isUdf)
{
char *udfRetVal = outputRow + (useVarchar ? 6 : 4);
if (useVarchar)
{
LmParameter &p = lmParams[numLmParams - 1];
ComUInt32 actual = p.actualOutDataSize(outputRow);
if (actual > maxUdfOutLen)
actual = maxUdfOutLen;
udfRetVal[actual] = 0;
}
else
{
udfRetVal[maxUdfOutLen] = 0;
}
fprintf(f, "%s Return value [%s]\n", prefix, udfRetVal);
}
fflush(f);
}
DumpDiags(cout, diags, prefixPlusSpace);
}
// Quiesce the executor
if (txRequired && result == LM_OK)
{
cliResult = SQL_EXEC_Xact(SQLTRANS_QUIESCE, NULL);
if (cliResult < 0)
{
fprintf(f, "%s SQL_EXEC_Xact() returned %d\n",
prefix, cliResult);
fflush(f);
result = LM_ERR;
}
else
{
fprintf(f, "%s Quiesced the executor\n", prefix);
fflush(f);
}
}
// End the transaction
if (txRequired && result == LM_OK)
{
Int32 tmfResult = ENDTRANSACTION();
if (tmfResult != 0)
{
fprintf(f, "%s ENDTRANSACTION() returned %d\n",
prefix, tmfResult);
fflush(f);
result = LM_ERR;
}
else
{
fprintf(f, "%s Ended the transaction\n", prefix);
fflush(f);
}
}
}
if (result == LM_OK)
{
fprintf(f, "\n");
fprintf(f, "%s Freeing resources...\n", prefix);
fflush(f);
}
// Call LM::putRoutine()
if (result == LM_OK && lmRoutine)
{
result = lm->putRoutine(lmRoutine, diags);
if (result != LM_OK)
{
fprintf(f, "%s putRoutine() returned %s\n",
prefix, LmResultToString(result));
fflush(f);
}
else
{
fprintf(f, "%s putRoutine() was successful\n", prefix);
fflush(f);
}
DumpDiags(cout, diags, prefixPlusSpace);
}
// Destroy the LM and the JVM. This will flush any JVM output
// buffers such as those containing data from the -verbose or
// -Xrunhprof options.
fprintf(f, "%s About to shut down the LM\n", prefix);
fflush(f);
if (isJava)
glob.destroyJavaLMAndJVM();
else
glob.destroyCLM();
fflush(stderr);
fflush(stdout);
if (result == LM_OK)
{
fprintf(f, "%s Done\n\n", prefix);
}
else
{
fprintf(f, "%s Done. Errors were encountered.\n\n", prefix);
}
fflush(f);
// Release allocated memory
if (sig)
h->deallocateMemory(sig);
if (inputRow)
h->deallocateMemory(inputRow);
if (outputRow)
h->deallocateMemory(outputRow);
return result;
} // invokeUdrMethod()
// Dead Code
// Andy thinks we should retire this interface...
// Function to open a file and then repeatedly call a helper function
// that will process one command from that file.
static Int32 processCommandsFromFile(const char *filename, UdrGlobals &glob)
{
Int32 result = 0;
FILE *out = MXUDR_OUTFILE;
const char *prefix = MXUDR_PREFIX;
fprintf(out, "%s Commands will be processed from file %s\n",
prefix, filename);
FILE *in = fopen(filename, "r");
if (!in)
{
fprintf(out, "%s *** ERROR: Could not open input file %s: %s.\n",
prefix, filename, strerror(errno));
return 1;
}
while (processSingleCommandFromFile(in, glob) == 0)
{
}
fprintf(out, "\n");
return result;
}
// Function to process one command from a text file. Returns 0 if the
// command was successfully processed, non-zero otherwise. Error
// messages are written to MXUDR_OUTFILE if anything goes wrong.
static Int32 processSingleCommandFromFile(FILE *in, UdrGlobals &glob)
{
FILE *out = MXUDR_OUTFILE;
const char *prefix = MXUDR_PREFIX;
char target[1024];
char originalCommand[1024];
char *resultOfRead = NULL;
char *trimmed;
// The while loop will repeat until either we cannot read any more
// data from the file, or we have successfully extracted the next
// non-blank line.
NABoolean done = false;
while (!done)
{
// Read a line from the file
resultOfRead = fgets(target, 1024, in);
if (resultOfRead != target)
{
if (ferror(in))
{
fprintf(out, "%s *** ERROR: fgets failed due to an I/O error: %s.\n",
prefix, strerror(errno));
return 1;
}
return 1;
}
// Trim leading and trailing spaces
Int32 len = str_len(target);
trimmed = target;
if (len > 0)
{
while (isSpace8859_1(trimmed[0]))
{
trimmed++;
}
len = str_len(trimmed);
while (len > 0 && isSpace8859_1(trimmed[len - 1]))
{
trimmed[len - 1] = 0;
len--;
}
}
if (str_len(trimmed) > 0)
{
done = true;
}
}
// Save a copy of the original command
originalCommand[0] = 0;
str_cat(originalCommand, trimmed, originalCommand);
char *tok;
const char *delim = " ";
// Now we (very primitively) walk through each token in the command,
// doing some semantic checking along the way. Right now the only
// supported command is "invoke [-n N] method class path [arg]..."
// $$$$ TBD
// * GET THIS TO WORK WITH RESULT SETS
// * SUPPORT -notx OPTION
// * SUPPORT -udf OPTION
// * SUPPORT -param C
// * SUPPORT -vc
NABoolean txRequired = TRUE;
NABoolean isUdf = FALSE;
NABoolean isJava = TRUE;
NABoolean useVarchar = FALSE;
tok = strtok(trimmed, delim);
if (tok == NULL)
{
fprintf(out, "%s *** ERROR: Bad command: %s\n", prefix, originalCommand);
return 1;
}
if (stricmp(tok, "invoke") != 0)
{
fprintf(out, "%s *** ERROR: Bad command: %s\n", prefix, originalCommand);
return 1;
}
tok = strtok(NULL, delim);
if (tok == NULL)
{
fprintf(out, "%s *** ERROR: Bad command: %s\n", prefix, originalCommand);
return 1;
}
Int32 nTimesToInvoke = 1;
Int32 nResultSets = 0;
const char *paramStyle = "java";
if (stricmp(tok, "-n") == 0)
{
tok = strtok(NULL, delim);
if (tok == NULL)
{
fprintf(out, "%s *** ERROR: Bad command: %s\n", prefix, originalCommand);
return 1;
}
nTimesToInvoke = atoi(tok);
// Next token needs to be "-param <param style>"
tok = strtok(NULL, delim);
if (tok == NULL || stricmp(tok, "-param") != 0)
{
fprintf(out, "%s *** ERROR: Bad command: %s\n", prefix, originalCommand);
return 1;
}
tok = strtok(NULL, delim);
if (tok == NULL)
{
fprintf(out, "%s *** ERROR: Bad command: %s\n", prefix, originalCommand);
return 1;
}
paramStyle = tok;
tok = strtok(NULL, delim);
if (tok == NULL)
{
fprintf(out, "%s *** ERROR: Bad command: %s\n", prefix, originalCommand);
return 1;
}
}
const char *method = tok;
tok = strtok(NULL, delim);
if (tok == NULL)
{
fprintf(out, "%s *** ERROR: Bad command: %s\n", prefix, originalCommand);
return 1;
}
const char *container = tok;
tok = strtok(NULL, delim);
if (tok == NULL)
{
fprintf(out, "%s *** ERROR: Bad command: %s\n", prefix, originalCommand);
return 1;
}
const char *path = tok;
// Done with semantic checks. All remaining arguments will be input
// to the user's Java method. We collect them into the argv array.
const Int32 maxNumArgs = 1024;
char *argv[maxNumArgs];
Int32 argc = 0;
while ((tok = strtok(NULL, delim)) != NULL)
{
if (argc < maxNumArgs)
{
argv[argc] = tok;
}
argc++;
}
if (argc > maxNumArgs)
{
fprintf(out,
"%s *** ERROR: The number of arguments (%d) must not exceed %d\n",
prefix, argc, maxNumArgs);
return 1;
}
// Now we have a complete description of the method that the user
// wants us to invoke. Call a helper function to do the work.
const char *longLine =
"----------------------------------------------------------------------";
fprintf(out, "%s %s\n", prefix, longLine);
fprintf(out, "%s Command: %s\n", prefix, originalCommand);
fprintf(out, "%s %s\n", prefix, longLine);
Int32 result = invokeUdrMethod(method, container, path,
isJava, isUdf, txRequired, useVarchar,
argc, argv, nResultSets,
nTimesToInvoke, glob);
return result;
} // processSingleCommandFromFile
void udrAssert(const char *f, Int32 l, const char *m)
{
if(!f)
f = "";
if(!m)
m = "";
cout << "MXUDR Assertion: " << m << endl;
cout << "at FILE: " << f << " LINE: " << l << endl;
makeTFDSCall(m, f, l);
// should not reach here
}
void udrAbort(const char *f, Int32 l, const char *m)
{
if(!f)
f = "";
if(!m)
m = "";
cout << "MXUDR Abort: " << m << endl;
cout << "at FILE: " << f << " LINE: " << l << endl;
makeTFDSCall(m, f, l, FALSE);
// should not reach here
}
// Stubs
#ifdef UDR_OSS_RELEASE
#include "CharType.h"
#include "NAType.h"
NABoolean NAType::isComparable(const NAType &other,
ItemExpr *parentOp,
Int32 emitErr) const
{ return FALSE; }
NABoolean CharType::isComparable(const NAType &other,
ItemExpr *parentOp,
Int32 emitErr) const
{ return FALSE; }
#endif
//
// Function to do all the work of processing a session message. Right
// now the only session message we support is JVM startup options. Any
// other type of session message will be ignored.
//
void processASessionMessage(UdrGlobals *glob,
UdrServerReplyStream &msgStream,
UdrSessionMsg &request)
{
const char *moduleName = "processASessionMessage";
#ifdef UDR_OSS_DEBUG
// In the debug build we allow some environment settings to force
// the server to do some bad things such as exit and send signals to
// other processes. For testing purposes only.
//
// MXUDR_SESSION_CRASH Exit immediately
//
// MXUDR_SESSION_SIGNAL Assume the value is an OSS process ID
// and kill that process
//
char *crash = getenv("MXUDR_SESSION_CRASH");
if (crash && crash[0])
{
UDR_DEBUG0("*** About to call exit(1) because environment");
UDR_DEBUG1("*** variable MXUDR_SESSION_CRASH is set to '%s'.", crash);
exit(1);
}
char *pname = getenv("MXUDR_SESSION_SIGNAL");
if (pname && pname[0])
{
Int32 pnum = atoi(pname);
Int32 signum = SIGINT;
UDR_DEBUG2("*** About to call kill(%d,%d) because environment",
pnum, signum);
UDR_DEBUG1("*** variable MXUDR_SESSION_SIGNAL is set to '%s'.", pname);
Int32 result = kill(pnum, signum);
UDR_DEBUG1("*** kill() returned %d", result);
// Now sleep for 5 seconds. Sometimes the signal does not get
// delivered right away and we do not want the client to receive
// our reply message before the signal is processed.
const Int32 secondsToSleep = 5;
DELAY(secondsToSleep * 100);
}
#endif // UDR_OSS_DEBUG
if (request.getType() == UdrSessionMsg::UDR_SESSION_TYPE_JAVA_OPTIONS)
{
LmJavaOptions *javaOptions = glob->getJavaOptions();
// 1. Clear any existing options
javaOptions->removeAllOptions();
// 2. Move environment settings into javaOptions
InitializeJavaOptionsFromEnvironment(*javaOptions, glob->getUdrHeap());
if (!IsNAStringSpaceOrEmpty(initErrText) )
{
controlErrorReply(glob,
msgStream,
UDR_ERR_MESSAGE_PROCESSING, //errorNumber,
0, //intErrorInfo,
initErrText.data() //charErrorInfo
);
return;
}
// 3. Move options from the message into javaOptions so that they
// take precedence over environment settings. The message is
// expected to contain two strings. First is a string of delimited
// JVM startup options. Second is the set of delimiter characters
// (default is a single space).
ComUInt32 numStrings = request.numStrings();
if (numStrings >= 1)
{
const char *options = request.getString(0);
const char *delimiters = " ";
if (numStrings >= 2)
{
delimiters = request.getString(1);
}
javaOptions->addOptions(options, delimiters, FALSE);
}
}
// Build a reply and send it
UdrSessionReply *reply = new (glob->getIpcHeap())
UdrSessionReply(glob->getIpcHeap());
reply->setHandle(request.getHandle());
msgStream.clearAllObjects();
msgStream << *reply;
if (glob->verbose_ &&
glob->traceLevel_ >= TRACE_IPMS &&
glob->showUnload_)
{
ServerDebug("[UdrServ (%s)] Send Session Reply", moduleName);
}
sendControlReply(glob, msgStream, NULL);
reply->decrRefCount();
}
//
// Helper function to dump memory stats to the UDR debug stream. Right
// now this only works on NSK and is only enabled for debug builds.
//
static void DumpProcessInfo()
{
#ifndef UDR_OSS_DEBUG
return;
#else
short attr_list[] =
{ 38 // PIN
, 54 // Current Page Count
, 59 // Page Fault Count
, 76 // PFS size
, 77 // Maximum PFS size
, 109 // Size of global data
, 103 // Current main stack size
, 104 // Maximum main stack size
, 111 // Current size of native heap
, 112 // Maximum size of native heap
, 113 // Guaranteed Swap Size
, 119 // Process is TNS/R Native?
, 39 // Program file name
};
const short attr_count = sizeof(attr_list) / sizeof(attr_list[0]);
const short return_values_maxlen = 256;
short return_values_list[return_values_maxlen];
memset(return_values_list, 0, sizeof(return_values_list));
short return_values_len;
short rc, errordetail;
rc = PROCESS_GETINFOLIST_( // cpu
, // pin
, // nodename
, // nodename_length
, // process_handle
, attr_list
, attr_count
, return_values_list
, return_values_maxlen
, &return_values_len
, &errordetail
);
UDR_DEBUG0("[BEGIN DumpProcessInfo]");
if (rc)
{
UDR_DEBUG0(" *** Error occurred in PROCESS_GETINFOLIST_");
UDR_DEBUG2(" *** Return code %d, error detail %d",
(Int32) rc, (Int32) errordetail);
}
else
{
UDR_DEBUG1(" Process ID %d", (Int32) return_values_list[0]);
UDR_DEBUG1(" Program file %s", (char *) &return_values_list[22]);
UDR_DEBUG1(" Current pages %d", (Int32) return_values_list[1]);
UDR_DEBUG1(" Page faults %ld", *((Lng32 *) &return_values_list[2]));
UDR_DEBUG1(" PFS size %ld", *((Lng32 *) &return_values_list[4]));
UDR_DEBUG1(" Max PFS size %ld", *((Lng32 *) &return_values_list[6]));
UDR_DEBUG1(" Global data size %ld", *((Lng32 *) &return_values_list[8]));
UDR_DEBUG1(" Stack size %ld", *((Lng32 *) &return_values_list[10]));
UDR_DEBUG1(" Max stack size %ld",
*((Lng32 *) &return_values_list[12]));
UDR_DEBUG1(" Native heap size %ld", *((Lng32 *) &return_values_list[14]));
UDR_DEBUG1(" Max native heap size %ld",
*((Lng32 *) &return_values_list[16]));
UDR_DEBUG1(" Guaranteed swap space %ld",
*((Lng32 *) &return_values_list[18]));
UDR_DEBUG1(" TNS/R Native flag %d", (Int32) return_values_list[20]);
}
UDR_DEBUG0("[END DumpProcessInfo]");
#endif
}
// ENTER TX message processing. The message is given to corresponding
// spinfo. The message is not replied immediately and only replied
// when EXIT TX or SUSPEND TX arrives.
void processAnEnterTxMessage(UdrGlobals *glob,
UdrServerReplyStream &msgStream,
UdrEnterTxMsg &request)
{
const char *moduleName = "processAnEnterTxMessage";
SPInfo *sp = glob->getSPList()->spFind(request.getHandle());
if (sp)
{
// There should be only one connection associated with this message
// stream. We use that connection to pass on to TxStream.
UDR_ASSERT((msgStream.getRecipients().entries() == 1),
"A UdrServerReplyStream must be associated with a single Connection");
CollIndex i = 0;
msgStream.getRecipients().setToNext(i);
IpcConnection *conn = msgStream.getRecipients().element(i);
UdrServerReplyStream *stream = sp->getTxStream();
if (stream->getState() == IpcMessageStream::RECEIVED)
{
// A message is already waiting on the stream. This is an error
controlErrorReply(glob,
msgStream,
UDR_ERR_MESSAGE_PROCESSING,
0,
"An Enter TX message is already pending.");
return;
}
// Give connection that the EnterTX appeared on as second param.
msgStream.giveMessageTo(*stream, conn);
if (glob->verbose_ && glob->traceLevel_ >= TRACE_IPMS)
{
ServerDebug("[UdrServ (%s)] Enter TX message is given to SP %s_" INT64_SPEC,
moduleName,
sp->getSqlName(),
sp->getUdrHandle());
}
}
else
{
controlErrorReply(glob,
msgStream,
UDR_ERR_MISSING_UDRHANDLE,
0,
"Enter TX");
}
}
void processASuspendTxMessage(UdrGlobals *glob,
UdrServerReplyStream &msgStream,
UdrSuspendTxMsg &request)
{
const char *moduleName = "processASuspendTxMessage";
// First reply to Enter TX message
SPInfo *sp = glob->getSPList()->spFind(request.getHandle());
if (sp)
{
NABoolean doneWithRS = FALSE;
sp->replyToEnterTxMsg(doneWithRS);
}
else
{
controlErrorReply(glob,
msgStream,
UDR_ERR_MISSING_UDRHANDLE,
0,
"Suspend TX");
return;
}
UdrSuspendTxReply *reply = new (glob->getIpcHeap())
UdrSuspendTxReply(glob->getIpcHeap());
reply->setHandle(request.getHandle());
msgStream.clearAllObjects();
msgStream << *reply;
if (glob->verbose_ &&
glob->traceLevel_ >= TRACE_IPMS &&
glob->showUnload_)
{
ServerDebug("[UdrServ (%s)] Send SUSPEND TX Reply", moduleName);
}
// No need to quiesce executor for replying to this message, so
// set NULL at the third param.
sendControlReply(glob, msgStream, NULL);
reply->decrRefCount();
}
void processAnExitTxMessage(UdrGlobals *glob,
UdrServerReplyStream &msgStream,
UdrExitTxMsg &request)
{
const char *moduleName = "processAnExitTxMessage";
// First reply to Enter TX message
SPInfo *sp = glob->getSPList()->spFind(request.getHandle());
if (sp)
{
// Reply to Enter Tx.
NABoolean doneWithRS = TRUE;
sp->replyToEnterTxMsg(doneWithRS);
}
else
{
controlErrorReply(glob,
msgStream,
UDR_ERR_MISSING_UDRHANDLE,
0,
"Exit TX");
return;
}
UdrExitTxReply *reply = new (glob->getIpcHeap())
UdrExitTxReply(glob->getIpcHeap());
reply->setHandle(request.getHandle());
msgStream.clearAllObjects();
msgStream << *reply;
if (glob->verbose_ &&
glob->traceLevel_ >= TRACE_IPMS &&
glob->showUnload_)
{
ServerDebug("[UdrServ (%s)] Sending EXIT TX Reply", moduleName);
}
// No need to quiesce executor for replying to this message, so
// set NULL at the third param.
sendControlReply(glob, msgStream, NULL);
reply->decrRefCount();
}
Int32 PerformWaitedReplyToClient(UdrGlobals *UdrGlob,
UdrServerDataStream &msgStream)
{
//Most of the calls here is copied from runServer() above.
sendDataReply(UdrGlob, msgStream, NULL);
// cleanup no longer used buffers
msgStream.cleanupBuffers();
//Note down the current sp so that we can restore it back
//once ProcessARequest() call is complete. There are scenarios
//in ProcessARequest to reset spInfo in udrGlobals once Spinfo
//returns control back to udrserv.
SPInfo *sp = UdrGlob->getCurrSP();
//get control connection to IPC and env
GuaReceiveControlConnection *ctrlConn = UdrGlob->getControlConnection();
IpcEnvironment *env = UdrGlob->getIpcEnvironment();
env->deleteCompletedMessages();
NAList<UdrServerReplyStream *> &replyStreams =
UdrGlob->getReplyStreams();
// Make sure we wait for a new message, even it it takes multiple
// I/Os due to multi-chunking. Also try to avoid hanging at this
// point, due to errors.
while (!replyStreams.entries() &&
ctrlConn->getConnection()->receiveIOPending() &&
ctrlConn->getConnection()->getState() != IpcConnection::ERROR_STATE &&
ctrlConn->getNumRequestors() > 0)
{
// Nothing on the queue, go ahead and wait...
// we have performed a reply, now wait indefinitely
ctrlConn->wait(IpcInfiniteTimeout);
}
//Wait is over, continue processing.
if(!replyStreams.entries())
{
return SQLUDR_ERROR;
}
//expect only one reply stream at location 0 ?
UdrServerReplyStream *stream = replyStreams[0];
replyStreams.removeAt(0);
processARequest(UdrGlob, *stream, *env);
//Restore SpInfo to udrGlobal
UdrGlob->setCurrSP(sp);
stream->addToCompletedList();
return SQLUDR_SUCCESS;
}